Skip to content

Commit 17e3a0c

Browse files
committed
Refactor pulse_agent and s2_agent decorators to enhance tracing, context management, and documentation
1 parent 479a044 commit 17e3a0c

File tree

4 files changed

+70
-52
lines changed

4 files changed

+70
-52
lines changed
155 Bytes
Binary file not shown.

dist/singlestore_pulse-0.1.tar.gz

161 Bytes
Binary file not shown.

pulse_otel/main.py

Lines changed: 69 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -245,59 +245,77 @@ def wrapper(func):
245245
return decorator(_func)
246246

247247
def pulse_agent(name):
248-
def decorator(func):
249-
@functools.wraps(func)
250-
async def wrapper(*args, **kwargs):
251-
add_session_id_to_span_attributes(**kwargs)
252-
logger.info(f"Executing agent: {name} with args: {args}, kwargs: {kwargs}")
253-
tracer = trace.get_tracer(__name__)
254-
255-
with tracer.start_as_current_span(name) as span:
256-
# Get current context with the span
257-
current_context = trace.set_span_in_context(span)
258-
259-
trace_id_hex = format(span.get_span_context().trace_id, "032x")
260-
logger.info(f"[wrapper] Started span. TraceID: {trace_id_hex}")
261-
262-
# Run the decorated function within the context
263-
decorated_func = agent(name)(func)
264-
265-
# Ensure the async function runs in the correct context
266-
import contextvars
267-
result = await contextvars.copy_context().run(
268-
lambda: decorated_func(*args, **kwargs)
269-
)
270-
271-
logger.info(f"Agent {name} executed with trace ID: {trace_id_hex}")
272-
return result
273-
return wrapper
274-
return decorator
248+
"""
249+
A decorator factory that wraps a function with additional tracing and session ID logic.
250+
251+
Args:
252+
name (str): The name to be used for the agent decorator.
253+
254+
Returns:
255+
function: A decorator that wraps the target function, adding session ID to span attributes
256+
before invoking the decorated agent function.
257+
258+
Usage:
259+
@pulse_agent("my_agent")
260+
def my_function(...):
261+
...
262+
263+
@pulse_agent(name="my_agent")
264+
def my_function(...):
265+
...
266+
"""
267+
def decorator(func):
268+
decorated_func = agent(name)(func)
269+
270+
@functools.wraps(func)
271+
def wrapper(*args, **kwargs):
272+
add_session_id_to_span_attributes(**kwargs)
273+
return decorated_func(*args, **kwargs)
274+
275+
return wrapper
276+
277+
return decorator
275278

276279
def s2_agent(name):
277-
def decorator(func):
278-
@functools.wraps(func)
279-
def wrapper(*args, **kwargs):
280-
# Capture the current context
281-
ctx = copy_context()
282-
283-
async def async_wrapper():
284-
add_session_id_to_span_attributes(**kwargs)
285-
tracer = trace.get_tracer(__name__)
286-
287-
# Create new span within the async context
288-
with tracer.start_as_current_span(name) as span:
289-
trace_id_hex = format(span.get_span_context().trace_id, "032x")
290-
logger.debug(f"[s2_agent wrapper] Started span. TraceID: {trace_id_hex}")
291-
292-
# Execute decorated function and yield results
293-
decorated_func = agent(name)(func)
294-
async for item in decorated_func(*args, **kwargs):
295-
yield item
296-
297-
# Run the entire async generator in captured context
298-
return ctx.run(async_wrapper)
299-
return wrapper
300-
return decorator
280+
"""
281+
s2_agent is a dedicated decorator for for event_generator function of s2ai framework that
282+
wraps an asynchronous event_generator function to add tracing
283+
capabilities using OpenTelemetry. It captures the current context, creates
284+
a new span, and ensures that the decorated function is executed within the
285+
captured context.
286+
287+
This is being done to make sure parent context is rightfully captured and propagated in a sync function so that
288+
the traceID of the current otel span is used is passed on the asynchronous generator function.
289+
Args:
290+
name (str): The name of the span to be created for tracing.
291+
Returns:
292+
Callable: A decorator that wraps the target asynchronous generator
293+
function, adding tracing and context management.
294+
"""
295+
def decorator(func):
296+
@functools.wraps(func)
297+
def wrapper(*args, **kwargs):
298+
# Capture the current context
299+
ctx = copy_context()
300+
301+
async def async_wrapper():
302+
add_session_id_to_span_attributes(**kwargs)
303+
tracer = trace.get_tracer(__name__)
304+
305+
# Create new span within the async context
306+
with tracer.start_as_current_span(name) as span:
307+
trace_id_hex = format(span.get_span_context().trace_id, "032x")
308+
logger.debug(f"[s2_agent wrapper] Started span. TraceID: {trace_id_hex}")
309+
310+
# Execute decorated function and yield results
311+
decorated_func = agent(name)(func)
312+
async for item in decorated_func(*args, **kwargs):
313+
yield item
314+
315+
# Run the entire async generator in captured context
316+
return ctx.run(async_wrapper)
317+
return wrapper
318+
return decorator
301319

302320
class CustomFileSpanExporter(SpanExporter):
303321
def __init__(self, file_name):

pulse_otel/util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def add_session_id_to_span_attributes(**kwargs):
185185
SESSION_ID: session_id,
186186
}
187187
Traceloop.set_association_properties(properties)
188-
188+
189189
def add_traceid_header(result: Response, traceID: str) -> Response:
190190
"""
191191
Adds a trace ID header to the response.

0 commit comments

Comments
 (0)