Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
cc8c8fd
Add TRACEID_RESPONSE_HEADER constant and utility function to add trac…
aanshu-ss Jun 7, 2025
07049bf
Enhance tracing in pulse_agent function by integrating OpenTelemetry …
aanshu-ss Jun 8, 2025
0db0bf0
Refactor pulse_agent function to simplify trace ID injection logic an…
aanshu-ss Jun 8, 2025
c59798e
Add logging to pulse_agent function for better traceability and debug…
aanshu-ss Jun 8, 2025
3283452
attach context
aanshu-ss Jun 9, 2025
718a114
Refactor pulse_agent decorator to enhance tracing and logging, ensuri…
aanshu-ss Jun 9, 2025
4f9a943
async tracing
aanshu-ss Jun 9, 2025
efbaef9
async2
aanshu-ss Jun 9, 2025
8132659
Add pulse_agent2 decorator for enhanced async context management and …
aanshu-ss Jun 9, 2025
a0ae0d4
Refactor pulse_agent2 to s2_agent for improved async context handling…
aanshu-ss Jun 9, 2025
d1556d3
Refactor pulse_agent and s2_agent decorators to enhance tracing, cont…
aanshu-ss Jun 9, 2025
697843c
Bump version to 0.2 and add new distribution files; enhance s2_agent …
aanshu-ss Jun 9, 2025
b187239
Rename s2_agent to s2_agent1 to avoid conflicts; update imports accor…
aanshu-ss Jun 9, 2025
1da84ff
Implement s2_agent1 decorator for enhanced async context management a…
aanshu-ss Jun 9, 2025
05f069b
Add configuration files for PyPI and environment setup; create TODO f…
aanshu-ss Jun 9, 2025
0bace4a
Add constants and utility functions for OpenTelemetry integration
aanshu-ss Jun 10, 2025
3764d38
Refactor pulse_agent decorator to improve readability and maintainabi…
aanshu-ss Jun 10, 2025
e1d5161
Remove deprecated files and update pulse_agent to pulse_agent2; adjus…
aanshu-ss Jun 12, 2025
a5c1185
Remove pulse_agent2 definition and restore pulse_agent function; upda…
aanshu-ss Jun 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added dist/singlestore_pulse-0.2-py3-none-any.whl
Binary file not shown.
Binary file added dist/singlestore_pulse-0.2.tar.gz
Binary file not shown.
9 changes: 9 additions & 0 deletions examples/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

perma_auth_token = <API_KEY>

api_uri = https://apps.aws-virginia-nb2.svc.singlestore.com:8000/modelasaservice/34003ccb-470d-4605-8afb-c699e69adb9d/v1

model_name = Qwen/Qwen2.5-7B-Instruct

#DEV or PROD
environment = DEV
1 change: 1 addition & 0 deletions pulse_otel/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
OTEL_COLLECTOR_ENDPOINT = "http://otel-collector-{PROJECTID_PLACEHOLDER}.observability.svc.cluster.local:4317"

HEADER_INCOMING_SESSION_ID = "singlestore-session-id"
TRACEID_RESPONSE_HEADER = "singlestore-trace-id"

# Formatted attribute names
APP_TYPE = "singlestore.nova.app.type"
Expand Down
115 changes: 57 additions & 58 deletions pulse_otel/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import os
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import agent, tool
from opentelemetry import _logs
from opentelemetry import _logs, trace


from opentelemetry.context import attach, set_value
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler, LogData
Expand All @@ -14,13 +15,11 @@
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
from fastapi import Request, Response
from fastapi.responses import JSONResponse

from contextvars import copy_context

from functools import wraps
import uuid
import logging
from typing import Callable
import typing

from pulse_otel.util import (
Expand All @@ -32,14 +31,14 @@
from pulse_otel.consts import (
LOCAL_TRACES_FILE,
LOCAL_LOGS_FILE,
SESSION_ID,
HEADER_INCOMING_SESSION_ID,
PROJECT,
LIVE_LOGS_FILE_PATH,
)
import logging

_pulse_instance = None
logger = logging.getLogger(__name__)


class Pulse:
def __init__(
Expand Down Expand Up @@ -202,35 +201,6 @@ def init_log_provider(self):
logging.root.addHandler(handler)
return log_exporter

def add_traceid_header(self, func: Callable) -> Callable:
@wraps(func)
async def wrapper(request: Request, *args, **kwargs) -> Response:
# Generate unique trace ID
trace_id = str(uuid.uuid4())

# Extract session ID from request headers if present
session_id = request.headers.get("X-SINGLESTORE-AI-SESSION-ID", "N/A")

try:
# Execute the original function
result = await func(request, *args, **kwargs)

# If result is already a Response object
if isinstance(result, Response):
result.headers["X-SINGLESTORE-TRACE-ID"] = trace_id
return result

return JSONResponse(
content=result,
headers={"X-SINGLESTORE-TRACE-ID": trace_id}
)

except Exception as e:
raise e

return wrapper


def pulse_tool(_func=None, *, name=None):
"""
Decorator to register a function as a tool. Can be used as @pulse_tool, @pulse_tool("name"), or @pulse_tool(name="name").
Expand Down Expand Up @@ -274,38 +244,67 @@ def wrapper(func):
# Called as @pulse_tool (without parentheses)
return decorator(_func)

def pulse_agent(name):
"""
A decorator factory that wraps a function with additional tracing and session ID logic.
# def pulse_agent(name):
# """
# A decorator factory that wraps a function with additional tracing and session ID logic.

Args:
name (str): The name to be used for the agent decorator.
# Args:
# name (str): The name to be used for the agent decorator.

Returns:
function: A decorator that wraps the target function, adding session ID to span attributes
before invoking the decorated agent function.
# Returns:
# function: A decorator that wraps the target function, adding session ID to span attributes
# before invoking the decorated agent function.

Usage:
@pulse_agent("my_agent")
def my_function(...):
...
# Usage:
# @pulse_agent("my_agent")
# def my_function(...):
# ...

@pulse_agent(name="my_agent")
def my_function(...):
...
"""
def decorator(func):
decorated_func = agent(name)(func)
# @pulse_agent(name="my_agent")
# def my_function(...):
# ...
# """
# def decorator(func):
# decorated_func = agent(name)(func)

# @functools.wraps(func)
# def wrapper(*args, **kwargs):
# add_session_id_to_span_attributes(**kwargs)
# return decorated_func(*args, **kwargs)

# return wrapper

# return decorator

def pulse_agent(name):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
add_session_id_to_span_attributes(**kwargs)
return decorated_func(*args, **kwargs)

logger.info(f"[s2_agent wrapper] hi")

ctx = copy_context()
trace_callback = kwargs.pop("trace_callback", None) # Accept optional callback

async def async_wrapper():
add_session_id_to_span_attributes(**kwargs)
tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span(name) as span:
trace_id_hex = format(span.get_span_context().trace_id, "032x")
logger.debug(f"[s2_agent wrapper] Started span. TraceID: {trace_id_hex}")

if trace_callback:
trace_callback(trace_id_hex) # Notify the caller

decorated_func = agent(name)(func)
async for item in decorated_func(*args, **kwargs):
yield item

return ctx.run(async_wrapper)
return wrapper

return decorator


class CustomFileSpanExporter(SpanExporter):
def __init__(self, file_name):
self.file_name = file_name
Expand Down
22 changes: 21 additions & 1 deletion pulse_otel/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
ENV_VARIABLES_MAPPING,
HEADER_INCOMING_SESSION_ID,
SESSION_ID,
TRACEID_RESPONSE_HEADER,
)
import random
from fastapi import Request, Response

from traceloop.sdk import Traceloop

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -208,3 +210,21 @@ def add_session_id_to_span_attributes(**kwargs):
SESSION_ID: session_id,
}
Traceloop.set_association_properties(properties)

def add_traceid_header(result: Response, traceID: str) -> Response:
"""
Adds a trace ID header to the response.
"""
try:
# If result is already a Response object
if isinstance(result, Response):
result.headers[TRACEID_RESPONSE_HEADER] = traceID
return result
return JSONResponse(
content=result,
headers={TRACEID_RESPONSE_HEADER: traceID}
)

except Exception as e:
print(f"Error adding trace ID header: {e}")
return result
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name='singlestore_pulse',
version='0.1',
version='0.1.0.1',
packages=find_packages(),
description='Singlestore Python SDK for OpenTelemetry Integration',
long_description=open('README.md').read(),
Expand Down