diff --git a/dist/singlestore_pulse-0.1-py3-none-any.whl b/dist/singlestore_pulse-0.1-py3-none-any.whl index db76cf9..ea97ecd 100644 Binary files a/dist/singlestore_pulse-0.1-py3-none-any.whl and b/dist/singlestore_pulse-0.1-py3-none-any.whl differ diff --git a/dist/singlestore_pulse-0.1.tar.gz b/dist/singlestore_pulse-0.1.tar.gz index 06f9884..8512c53 100644 Binary files a/dist/singlestore_pulse-0.1.tar.gz and b/dist/singlestore_pulse-0.1.tar.gz differ diff --git a/pulse_otel/__init__.py b/pulse_otel/__init__.py index 557f1d3..5eea417 100644 --- a/pulse_otel/__init__.py +++ b/pulse_otel/__init__.py @@ -1 +1 @@ -from pulse_otel.main import Pulse, CustomFileSpanExporter, FileLogExporter, pulse_agent, pulse_tool +from pulse_otel.main import Pulse, CustomFileSpanExporter, FileLogExporter, pulse_agent, pulse_tool, healthcheck diff --git a/pulse_otel/consts.py b/pulse_otel/consts.py index a7e254e..804459a 100644 --- a/pulse_otel/consts.py +++ b/pulse_otel/consts.py @@ -2,6 +2,7 @@ OTEL_COLLECTOR_ENDPOINT = "http://otel-collector-{PROJECTID_PLACEHOLDER}.observability.svc.cluster.local:4317" HEADER_INCOMING_SESSION_ID = "singlestore-session-id" +TRACEID_RESPONSE_HEADER = "singlestore-trace-id" # Formatted attribute names APP_TYPE = "singlestore.nova.app.type" diff --git a/pulse_otel/main.py b/pulse_otel/main.py index e0f2052..6098469 100644 --- a/pulse_otel/main.py +++ b/pulse_otel/main.py @@ -2,7 +2,9 @@ import os from traceloop.sdk import Traceloop from traceloop.sdk.decorators import agent, tool -from opentelemetry import _logs +from opentelemetry import _logs, trace + +from opentelemetry.trace import get_current_span from opentelemetry.context import attach, set_value from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler, LogData @@ -37,6 +39,7 @@ HEADER_INCOMING_SESSION_ID, PROJECT, LIVE_LOGS_FILE_PATH, + TRACEID_RESPONSE_HEADER ) import logging @@ -181,58 +184,26 @@ def init_log_provider(self): logging.root.setLevel(logging.INFO) logging.root.addHandler(handler) return log_exporter + - def pulse_add_session_id(self, session_id=None, **kwargs): + @staticmethod + def add_traceid_header(result: Response, traceID: str) -> Response: """ - Decorator to set Traceloop association properties for a function. - - Parameters: - - session_id: Optional session_id identifier - - **kwargs: Any additional association properties + Adds a trace ID header to the response. """ - def decorator(func): - def wrapper(*args, **kwargs_inner): - - properties = {} - if session_id: - properties["session_id"] = session_id - properties.update(kwargs) - - # Set the association properties - Traceloop.set_association_properties(properties) - return func(*args, **kwargs_inner) - return wrapper - return decorator - - - def add_traceid_header(self, func: Callable) -> Callable: - @wraps(func) - async def wrapper(request: Request, *args, **kwargs) -> Response: - # Generate unique trace ID - trace_id = str(uuid.uuid4()) - - # Extract session ID from request headers if present - session_id = request.headers.get("X-SINGLESTORE-AI-SESSION-ID", "N/A") - - try: - # Execute the original function - result = await func(request, *args, **kwargs) - - # If result is already a Response object - if isinstance(result, Response): - result.headers["X-SINGLESTORE-TRACE-ID"] = trace_id - return result - - return JSONResponse( - content=result, - headers={"X-SINGLESTORE-TRACE-ID": trace_id} - ) - - except Exception as e: - raise e - - return wrapper + try: + # If result is already a Response object + if isinstance(result, Response): + result.headers[TRACEID_RESPONSE_HEADER] = traceID + return result + return JSONResponse( + content=result, + headers={TRACEID_RESPONSE_HEADER: traceID} + ) + except Exception as e: + print(f"Error adding trace ID header: {e}") + return result def pulse_tool(_func=None, *, name=None): """ @@ -290,79 +261,107 @@ def add_session_id_to_span_attributes(kwargs): print("[pulse_agent] No singlestore-session-id found in baggage.") def pulse_agent(_func=None, *, name=None): - """ - A decorator that integrates with the SingleStore Pulse agent to associate - session IDs with function calls for tracing purposes. It extracts the - session ID from the `baggage` header if available, or generates a random - session ID if not. The session ID is then set as an association property - for tracing. - - Args: - _func (callable, optional): The function to be decorated. Defaults to None. - name (str, optional): The name to be used for the agent. If not provided, - it defaults to the function name. - - Returns: - callable: The wrapped function with tracing capabilities. - - Notes: - - If a session ID is found in the `baggage` header, it is used for tracing. - - If no session ID is found, a random session ID is generated. - - The `Traceloop.set_association_properties` method is used to set the - session ID as an association property. - - The `agent` function is used to wrap the original function with the - resolved name. - - Example: - @pulse_agent(name="my_app") - def my_function(headers): - # Function logic here - pass - - @pulse_agent - def my_function(headers): - # Function logic here - pass - - # Works with other decorators: - @pulse_agent(name="my_app") - @retry(stop=stop_after_attempt(3)) - def my_function(headers): - # Function logic here - pass - """ - def decorator(func): - # Use the provided name or fall back to the function's name - agent_name = name or func.__name__ - - # Apply the agent decorator to the function - decorated_func = agent(agent_name)(func) - - @functools.wraps(func) - def wrapper(*args, **kwargs): - add_session_id_to_span_attributes(kwargs) - return decorated_func(*args, **kwargs) - - return wrapper - - if _func is None: - # Called as @pulse_agent() or @pulse_agent(name="...") - return decorator - elif isinstance(_func, str): - # Called as @pulse_agent("name") - backward compatibility - def wrapper(func): - agent_name = _func - decorated_func = agent(agent_name)(func) - - @functools.wraps(func) - def inner(*args, **kwargs): - add_session_id_to_span_attributes(kwargs) - return decorated_func(*args, **kwargs) - return inner - return wrapper - else: - # Called as @pulse_agent (without parentheses) - return decorator(_func) + """ + A decorator that integrates with the SingleStore Pulse agent to associate + session IDs with function calls for tracing purposes. It extracts the + session ID from the `baggage` header if available, or generates a random + session ID if not. The session ID is then set as an association property + for tracing. + + Args: + _func (callable, optional): The function to be decorated. Defaults to None. + name (str, optional): The name to be used for the agent. If not provided, + it defaults to the function name. + + Returns: + callable: The wrapped function with tracing capabilities. + + Notes: + - If a session ID is found in the `baggage` header, it is used for tracing. + - If no session ID is found, a random session ID is generated. + - The `Traceloop.set_association_properties` method is used to set the + session ID as an association property. + - The `agent` function is used to wrap the original function with the + resolved name. + + Example: + @pulse_agent(name="my_app") + def my_function(headers): + # Function logic here + pass + + @pulse_agent + def my_function(headers): + # Function logic here + pass + + # Works with other decorators: + @pulse_agent(name="my_app") + @retry(stop=stop_after_attempt(3)) + def my_function(headers): + # Function logic here + pass + """ + + def decorator(func): + # Use the provided name or fall back to the function's name + agent_name = name or func.__name__ + + # Apply the agent decorator to the function + decorated_func = agent(agent_name)(func) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + add_session_id_to_span_attributes(kwargs) + tracer = trace.get_tracer(agent_name) + with tracer.start_as_current_span(agent_name): + result = decorated_func(*args, **kwargs) + + span = get_current_span() + trace_id = span.get_span_context().trace_id + + # Convert to hex string (OpenTelemetry trace IDs are 16-byte ints) + trace_id_hex = format(trace_id, "032x") + result = Pulse.add_traceid_header(result, trace_id_hex) + return result + + return wrapper + + if _func is None: + # Called as @pulse_agent() or @pulse_agent(name="...") + return decorator + elif isinstance(_func, str): + # Called as @pulse_agent("name") - backward compatibility + def wrapper(func): + agent_name = _func + decorated_func = agent(agent_name)(func) + + @functools.wraps(func) + def inner(*args, **kwargs): + add_session_id_to_span_attributes(kwargs) + tracer = trace.get_tracer(agent_name) + + with tracer.start_as_current_span(agent_name): + result = decorated_func(*args, **kwargs) + + span = get_current_span() + trace_id = span.get_span_context().trace_id + + trace_id_hex = format(trace_id, "032x") + result = Pulse.add_traceid_header(result, trace_id_hex) + return result + return inner + return wrapper + else: + # Called as @pulse_agent (without parentheses) + return decorator(_func) + +def healthcheck(): + """ + Health check endpoint for the Pulse service. + Returns a JSON response indicating the service is healthy. + """ + return {"status": "healthy"} class CustomFileSpanExporter(SpanExporter): def __init__(self, file_name):