diff --git a/dist/singlestore_pulse-0.2-py3-none-any.whl b/dist/singlestore_pulse-0.2-py3-none-any.whl new file mode 100644 index 0000000..e939edf Binary files /dev/null and b/dist/singlestore_pulse-0.2-py3-none-any.whl differ diff --git a/dist/singlestore_pulse-0.2.tar.gz b/dist/singlestore_pulse-0.2.tar.gz new file mode 100644 index 0000000..109c32b Binary files /dev/null and b/dist/singlestore_pulse-0.2.tar.gz differ diff --git a/examples/.env b/examples/.env new file mode 100644 index 0000000..fb8ddc8 --- /dev/null +++ b/examples/.env @@ -0,0 +1,9 @@ + +perma_auth_token = + +api_uri = https://apps.aws-virginia-nb2.svc.singlestore.com:8000/modelasaservice/34003ccb-470d-4605-8afb-c699e69adb9d/v1 + +model_name = Qwen/Qwen2.5-7B-Instruct + +#DEV or PROD +environment = DEV diff --git a/pulse_otel/consts.py b/pulse_otel/consts.py index a7e254e..804459a 100644 --- a/pulse_otel/consts.py +++ b/pulse_otel/consts.py @@ -2,6 +2,7 @@ OTEL_COLLECTOR_ENDPOINT = "http://otel-collector-{PROJECTID_PLACEHOLDER}.observability.svc.cluster.local:4317" HEADER_INCOMING_SESSION_ID = "singlestore-session-id" +TRACEID_RESPONSE_HEADER = "singlestore-trace-id" # Formatted attribute names APP_TYPE = "singlestore.nova.app.type" diff --git a/pulse_otel/main.py b/pulse_otel/main.py index 5a55c32..ce1e03d 100644 --- a/pulse_otel/main.py +++ b/pulse_otel/main.py @@ -2,7 +2,8 @@ import os from traceloop.sdk import Traceloop from traceloop.sdk.decorators import agent, tool -from opentelemetry import _logs +from opentelemetry import _logs, trace + from opentelemetry.context import attach, set_value from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler, LogData @@ -14,13 +15,11 @@ from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( OTLPLogExporter, ) -from fastapi import Request, Response -from fastapi.responses import JSONResponse + +from contextvars import copy_context from functools import wraps -import uuid import logging -from typing import Callable import typing from pulse_otel.util import ( @@ -32,14 +31,14 @@ from pulse_otel.consts import ( LOCAL_TRACES_FILE, LOCAL_LOGS_FILE, - SESSION_ID, - HEADER_INCOMING_SESSION_ID, PROJECT, LIVE_LOGS_FILE_PATH, ) import logging _pulse_instance = None +logger = logging.getLogger(__name__) + class Pulse: def __init__( @@ -202,35 +201,6 @@ def init_log_provider(self): logging.root.addHandler(handler) return log_exporter - def add_traceid_header(self, func: Callable) -> Callable: - @wraps(func) - async def wrapper(request: Request, *args, **kwargs) -> Response: - # Generate unique trace ID - trace_id = str(uuid.uuid4()) - - # Extract session ID from request headers if present - session_id = request.headers.get("X-SINGLESTORE-AI-SESSION-ID", "N/A") - - try: - # Execute the original function - result = await func(request, *args, **kwargs) - - # If result is already a Response object - if isinstance(result, Response): - result.headers["X-SINGLESTORE-TRACE-ID"] = trace_id - return result - - return JSONResponse( - content=result, - headers={"X-SINGLESTORE-TRACE-ID": trace_id} - ) - - except Exception as e: - raise e - - return wrapper - - def pulse_tool(_func=None, *, name=None): """ Decorator to register a function as a tool. Can be used as @pulse_tool, @pulse_tool("name"), or @pulse_tool(name="name"). @@ -274,38 +244,67 @@ def wrapper(func): # Called as @pulse_tool (without parentheses) return decorator(_func) -def pulse_agent(name): - """ - A decorator factory that wraps a function with additional tracing and session ID logic. +# def pulse_agent(name): +# """ +# A decorator factory that wraps a function with additional tracing and session ID logic. - Args: - name (str): The name to be used for the agent decorator. +# Args: +# name (str): The name to be used for the agent decorator. - Returns: - function: A decorator that wraps the target function, adding session ID to span attributes - before invoking the decorated agent function. +# Returns: +# function: A decorator that wraps the target function, adding session ID to span attributes +# before invoking the decorated agent function. - Usage: - @pulse_agent("my_agent") - def my_function(...): - ... +# Usage: +# @pulse_agent("my_agent") +# def my_function(...): +# ... - @pulse_agent(name="my_agent") - def my_function(...): - ... - """ - def decorator(func): - decorated_func = agent(name)(func) +# @pulse_agent(name="my_agent") +# def my_function(...): +# ... +# """ +# def decorator(func): +# decorated_func = agent(name)(func) + +# @functools.wraps(func) +# def wrapper(*args, **kwargs): +# add_session_id_to_span_attributes(**kwargs) +# return decorated_func(*args, **kwargs) +# return wrapper + +# return decorator + +def pulse_agent(name): + def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): - add_session_id_to_span_attributes(**kwargs) - return decorated_func(*args, **kwargs) - + logger.info(f"[s2_agent wrapper] hi") + + ctx = copy_context() + trace_callback = kwargs.pop("trace_callback", None) # Accept optional callback + + async def async_wrapper(): + add_session_id_to_span_attributes(**kwargs) + tracer = trace.get_tracer(__name__) + + with tracer.start_as_current_span(name) as span: + trace_id_hex = format(span.get_span_context().trace_id, "032x") + logger.debug(f"[s2_agent wrapper] Started span. TraceID: {trace_id_hex}") + + if trace_callback: + trace_callback(trace_id_hex) # Notify the caller + + decorated_func = agent(name)(func) + async for item in decorated_func(*args, **kwargs): + yield item + + return ctx.run(async_wrapper) return wrapper - return decorator + class CustomFileSpanExporter(SpanExporter): def __init__(self, file_name): self.file_name = file_name diff --git a/pulse_otel/util.py b/pulse_otel/util.py index aba4d13..4e1fc71 100644 --- a/pulse_otel/util.py +++ b/pulse_otel/util.py @@ -10,8 +10,10 @@ ENV_VARIABLES_MAPPING, HEADER_INCOMING_SESSION_ID, SESSION_ID, + TRACEID_RESPONSE_HEADER, ) -import random +from fastapi import Request, Response + from traceloop.sdk import Traceloop logger = logging.getLogger(__name__) @@ -208,3 +210,21 @@ def add_session_id_to_span_attributes(**kwargs): SESSION_ID: session_id, } Traceloop.set_association_properties(properties) + +def add_traceid_header(result: Response, traceID: str) -> Response: + """ + Adds a trace ID header to the response. + """ + try: + # If result is already a Response object + if isinstance(result, Response): + result.headers[TRACEID_RESPONSE_HEADER] = traceID + return result + return JSONResponse( + content=result, + headers={TRACEID_RESPONSE_HEADER: traceID} + ) + + except Exception as e: + print(f"Error adding trace ID header: {e}") + return result diff --git a/setup.py b/setup.py index d4f2177..83d3cb3 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='singlestore_pulse', - version='0.1', + version='0.1.0.1', packages=find_packages(), description='Singlestore Python SDK for OpenTelemetry Integration', long_description=open('README.md').read(),