diff --git a/dist/singlestore_pulse-0.3-py3-none-any.whl b/dist/singlestore_pulse-0.3-py3-none-any.whl index 6abf318..5454784 100644 Binary files a/dist/singlestore_pulse-0.3-py3-none-any.whl and b/dist/singlestore_pulse-0.3-py3-none-any.whl differ diff --git a/dist/singlestore_pulse-0.3.tar.gz b/dist/singlestore_pulse-0.3.tar.gz index ce9a4a8..f33842d 100644 Binary files a/dist/singlestore_pulse-0.3.tar.gz and b/dist/singlestore_pulse-0.3.tar.gz differ diff --git a/examples/otel-collector/Dockerfile.go-webapp b/examples/otel-collector/Dockerfile.go-webapp new file mode 100644 index 0000000..a726636 --- /dev/null +++ b/examples/otel-collector/Dockerfile.go-webapp @@ -0,0 +1,16 @@ +FROM golang:1.22-alpine + +WORKDIR /app + +# Copy the entire pulse_go_otel_instrumentor directory to maintain package structure +COPY ./pulse_go_otel_instrumentor/ ./ + +# Download dependencies +RUN go mod download + +# Build the main application from the main subdirectory +RUN go build -o main ./main + +EXPOSE 8093 + +CMD ["./main/main"] diff --git a/examples/otel-collector/Dockerfile b/examples/otel-collector/Dockerfile.myapp similarity index 84% rename from examples/otel-collector/Dockerfile rename to examples/otel-collector/Dockerfile.myapp index 4eba6f5..6b39842 100644 --- a/examples/otel-collector/Dockerfile +++ b/examples/otel-collector/Dockerfile.myapp @@ -7,9 +7,9 @@ COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt RUN pip3 install git+https://github.com/singlestore-labs/singlestore-pulse.git@master -COPY main.py . +COPY myapp.py . COPY .env . EXPOSE 8000 -CMD ["python", "main.py"] +CMD ["python", "myapp.py"] diff --git a/examples/otel-collector/Dockerfile.myapp_2 b/examples/otel-collector/Dockerfile.myapp_2 new file mode 100644 index 0000000..e828252 --- /dev/null +++ b/examples/otel-collector/Dockerfile.myapp_2 @@ -0,0 +1,17 @@ +# Dockerfile +FROM python:3.11 + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY singlestore_pulse-0.3-py3-none-any.whl . +RUN pip install --no-cache-dir singlestore_pulse-0.3-py3-none-any.whl +# RUN pip3 install git+https://github.com/singlestore-labs/singlestore-pulse.git@master + +COPY myapp_2.py . +COPY .env . + +EXPOSE 8000 + +CMD ["python", "myapp_2.py"] diff --git a/examples/otel-collector/docker-compose.yml b/examples/otel-collector/docker-compose.yml index e22f8c8..7eae1fb 100644 --- a/examples/otel-collector/docker-compose.yml +++ b/examples/otel-collector/docker-compose.yml @@ -4,13 +4,32 @@ services: myapp: build: context: . - dockerfile: Dockerfile + dockerfile: Dockerfile.myapp ports: - "8007:8000" depends_on: - otel-collector networks: - otel-network + myapp_2: + build: + context: . + dockerfile: Dockerfile.myapp_2 + ports: + - "8008:8000" + depends_on: + - otel-collector + networks: + - otel-network + + go-webapp: + build: + context: . + dockerfile: Dockerfile.go-webapp + ports: + - "8009:8093" + networks: + - otel-network otel-collector: image: otel/opentelemetry-collector:latest diff --git a/examples/otel-collector/myapp.py b/examples/otel-collector/myapp.py new file mode 100644 index 0000000..7061ff4 --- /dev/null +++ b/examples/otel-collector/myapp.py @@ -0,0 +1,337 @@ +from dotenv import load_dotenv +import os +import datetime +import json +from fastapi import FastAPI, HTTPException +from fastapi import Request +import uvicorn +from pydantic import BaseModel + +from openai import OpenAI +import requests +from fastapi.responses import JSONResponse +from opentelemetry.sdk._logs import LoggingHandler + +from pulse_otel import Pulse, pulse_agent, pulse_tool + +import logging +from tenacity import retry, stop_after_attempt, wait_fixed + +app = FastAPI(title="My time agent", description="A FastAPI app that uses Pulse OTel for tracing and logging", version="1.0.0") + +# Define a Pydantic model for the request body +class AgentRunRequest(BaseModel): + prompt: str + session_id: str = None # Optional session ID + +class Item(BaseModel): + id: int + name: str + price: float + +logger = logging.getLogger("myapp") +logger.setLevel(logging.DEBUG) + +def get_configs(): + """ + Reads and returns configurations from the .env file. + """ + load_dotenv() # Load environment variables from .env file + configs = { + "perma_auth_token": os.getenv("perma_auth_token"), + "api_uri": os.getenv("api_uri"), + "model_name": os.getenv("model_name"), + } + return configs + +# Define available tools +tools = [ + { + "type": "function", + "function": { + "name": "get_current_time", + "description": "Get the current time in HH:MM:SS format", + "parameters": {} + } + }, + { + "type": "function", + "function": { + "name": "get_current_date", + "description": "Get the current date in YYYY-MM-DD format", + "parameters": {} + } + }, + { + "type": "function", + "function": { + "name": "get_funny_current_time", + "description": "Get the current time in HH:MM:SS format with a funny phrase", + "parameters": { + "type": "object", + "properties": { + "funny_phrase": { + "type": "string", + "description": "A humorous phrase to include with the time" + } + }, + "required": ["funny_phrase"] + } + } + } +] + +# Define a simple tool: a function to get the current time +@pulse_tool() +def get_current_time(): + logger.info("TEST LOGS get_current_time") + logger.debug("DEBUG LOGS get_current_time") + logger.critical("CRITICAL LOGS get_current_time") + return datetime.datetime.now().strftime("%H:%M:%S") + +# Define a new tool: a function to get the current date +@pulse_tool(name="ToolA") +def get_current_date(): + logger.critical("CRITICAL LOGS of get_current_date") + logger.debug("DEBUG LOGS of get_current_date") + logger.info("INFO LOGS of get_current_date") + return datetime.datetime.now().strftime("%Y-%m-%d") + +# Define a new tool: a function to get the current time with a funny phrase +@retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) +@pulse_tool("toolB") +def get_funny_current_time(funny_phrase): + logger.critical("CRITICAL LOGS of get_funny_current_time") + logger.debug("DEBUG LOGS of get_funny_current_time") + logger.info("INFO LOGS of get_funny_current_time") + + current_time = datetime.datetime.now().strftime("%H:%M:%S") + funny_timestamp = f"{funny_phrase}! The time is {current_time}" + return get_funny_timestamp_phrase(funny_timestamp) + +def get_funny_timestamp_phrase(funny_timestamp): + logger.info("TEST LOGS get_funny_timestamp_phrase") + logger.debug("DEBUG LOGS get_funny_timestamp_phrase") + logger.critical("CRITICAL LOGS get_funny_timestamp_phrase") + return f"Here is a funny timestamp: {funny_timestamp}" + +# Simple agent function to process user input and decide on tool use +@app.post("/agent/run") +@pulse_agent("MyDockerTimeAgent") +def agent_run(request: Request, body: AgentRunRequest): # Changed back to sync function + try: + prompt = body.prompt + messages = [{"role": "user", "content": prompt}] + + configs = get_configs() + + # Validate required configs + if not configs["perma_auth_token"] or not configs["api_uri"] or not configs["model_name"]: + raise HTTPException(status_code=500, detail="Missing required configuration") + + client = OpenAI( + api_key=configs["perma_auth_token"], + base_url=configs["api_uri"], + ) + + # Make a chat completion request with tools + response = client.chat.completions.create( + model=configs["model_name"], + messages=messages, + tools=tools, + tool_choice="auto", + extra_headers={"X-Session-ID": "session_id"}, + ) + + # Check if the response involves a tool call + if response.choices[0].message.tool_calls: + results = [] + for tool_call in response.choices[0].message.tool_calls: + if tool_call.function.name == "get_current_time": + result = get_current_time() + results.append(result) + elif tool_call.function.name == "get_current_date": + result = get_current_date() + results.append(result) + elif tool_call.function.name == "get_funny_current_time": + arguments = json.loads(tool_call.function.arguments) + funny_phrase = arguments.get("funny_phrase", "Just kidding") + result = get_funny_current_time(funny_phrase) + results.append(result) + + # Return the first result (or combine multiple results if needed) + return {"response": results[0] if len(results) == 1 else results} + else: + return {"response": response.choices[0].message.content} + + except Exception as e: + logger.error(f"Error in agent_run: {str(e)}") + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + +def http_req(body: Item): + """ + Makes an HTTP request to the myapp_2 service at the /cftocf endpoint. + """ + url = "http://myapp_2:8000/getdata" + + data = { + "name": body.name, + "price": body.price, + "id": body.id + } + + # Set headers + headers = { + "Content-Type": "application/json" + } + + try: + # Make the POST request with timeout + response = requests.post(url, json=data, headers=headers, timeout=30) + + # Check if the request was successful + response.raise_for_status() + + # Log successful response + logger.info(f"HTTP request to myapp_2 successful. Status Code: {response.status_code}") + + return { + "status": "success", + "status_code": response.status_code, + "response": response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text + } + + except requests.exceptions.Timeout: + logger.error("HTTP request to myapp_2 timed out") + raise HTTPException(status_code=504, detail="Request to myapp_2 service timed out") + + except requests.exceptions.ConnectionError: + logger.error("Failed to connect to myapp_2 service") + raise HTTPException(status_code=502, detail="Failed to connect to myapp_2 service") + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP error occurred when calling myapp_2: {e}") + raise HTTPException(status_code=response.status_code, detail=f"myapp_2 service error: {response.text}") + + except requests.exceptions.RequestException as e: + logger.error(f"Request error occurred when calling myapp_2: {e}") + raise HTTPException(status_code=500, detail="Failed to make request to myapp_2 service") + +# Define a health check endpoint +@app.get("/health") +def health_check(): + return {"status": "ok"} + +# Define the root endpoint for FastAPI +@app.get("/") +def root(): + return {"message": "Welcome to the Pulse OTel FastAPI agent!"} + +# @pulse_agent("MyCloudFunctionToCloudFunctionAgent") +@app.post("/cloud_function_to_cloud_function") +@pulse_agent("MyCloudFunctionToCloudFunctionAgent") +def agent_run_2(request: Request, body: Item): + try: + return http_req(body) + except Exception as e: + logger.error(f"Error in cloud_function_to_cloud_function: {str(e)}") + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + +def http_req_go_service(body: Item): + """ + Makes an HTTP request to the Go webapp service at the /api/process endpoint. + """ + url = "http://go-webapp:8000/api/process" + + data = { + "id": body.id, + "name": body.name, + "price": body.price + } + + headers = { + "Content-Type": "application/json" + } + + try: + # Make the POST request with timeout + response = requests.post(url, json=data, headers=headers, timeout=30) + + # Check if the request was successful + response.raise_for_status() + + # Log successful response + logger.info(f"HTTP request to go-webapp successful. Status Code: {response.status_code}") + + return { + "status": "success", + "status_code": response.status_code, + "response": response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text + } + + except requests.exceptions.Timeout: + logger.error("HTTP request to go-webapp timed out") + raise HTTPException(status_code=504, detail="Request to go-webapp service timed out") + + except requests.exceptions.ConnectionError: + logger.error("Failed to connect to go-webapp service") + raise HTTPException(status_code=502, detail="Failed to connect to go-webapp service") + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP error occurred when calling go-webapp: {e}") + raise HTTPException(status_code=response.status_code, detail=f"go-webapp service error: {response.text}") + + except requests.exceptions.RequestException as e: + logger.error(f"Request error occurred when calling go-webapp: {e}") + raise HTTPException(status_code=500, detail="Failed to make request to go-webapp service") + +@pulse_agent("MyGoServiceAgent") +@app.post("/call-go-service") +def call_go_service(request: Request, body: Item): + """ + Endpoint that calls the Go webapp service + """ + try: + return http_req_go_service(body) + except Exception as e: + logger.error(f"Error in call-go-service: {str(e)}") + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + +@app.post("/go_py_py") +def cftocf_endpoint(request: Request, body: Item): + """ + This is the target endpoint that myapp will call. + It processes the item and returns a response. + """ + try: + # Process the item (you can add your business logic here) + processed_data = { + "id": body.id, + "name": body.name, + "price": body.price + } + + logger.info(f"Successfully processed item {body.id}") + + # Call the http_req function to make an HTTP request to another service + return { + "status": "success", + "data": processed_data, + "message": "Item processed successfully" + } + + except Exception as e: + logger.error(f"Error in cftocf endpoint: {str(e)}") + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + +def main(): + # write to otel collector + # _ = Pulse( + # otel_collector_endpoint="http://otel-collector:4317", + # ) + + # Create a FastAPI app + uvicorn.run(app, host="0.0.0.0", port=8000) + +if __name__ == "__main__": + main() diff --git a/examples/otel-collector/main.py b/examples/otel-collector/myapp_2.py similarity index 62% rename from examples/otel-collector/main.py rename to examples/otel-collector/myapp_2.py index 40e494e..4369f27 100644 --- a/examples/otel-collector/main.py +++ b/examples/otel-collector/myapp_2.py @@ -2,6 +2,8 @@ import os import datetime import json +import requests + from fastapi import FastAPI, HTTPException from fastapi import Request import uvicorn @@ -11,7 +13,7 @@ from opentelemetry.sdk._logs import LoggingHandler -from pulse_otel import Pulse, pulse_agent, pulse_tool +from pulse_otel import Pulse, pulse_agent, pulse_tool, observe import logging from tenacity import retry, stop_after_attempt, wait_fixed @@ -23,6 +25,11 @@ class AgentRunRequest(BaseModel): prompt: str session_id: str = None # Optional session ID +class Item(BaseModel): + id: int + name: str + price: float + logger = logging.getLogger("myapp") logger.setLevel(logging.DEBUG) @@ -172,6 +179,110 @@ def health_check(): def root(): return {"message": "Welcome to the Pulse OTel FastAPI agent!"} +@pulse_agent("getdata") +@app.post("/getdata") +def cftocf_endpoint(request: Request, body: Item): + """ + This is the target endpoint that myapp will call. + It processes Item data and returns a response. + """ + try: + logger.info(f"Received getdata request for item: {body.name} with id: {body.id}") + + # Process the item (you can add your business logic here) + processed_data = { + "message": f"Successfully processed item: {body.name}", + "item_id": body.id, + "item_name": body.name, + "item_price": body.price, + "processed_at": datetime.datetime.now().isoformat(), + "status": "success" + } + + logger.info(f"Successfully processed item {body.id}") + return processed_data + + except Exception as e: + logger.error(f"Error in cftocf endpoint: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to process item: {str(e)}") + +def http_req(body: Item): + """ + Makes an HTTP request to the myapp_2 service at the /cftocf endpoint. + """ + url = "http://myapp:8000/go_py_py" + + data = { + "name": body.name, + "price": body.price, + "id": body.id + } + + # Set headers + headers = { + "Content-Type": "application/json" + } + + try: + # Make the POST request with timeout + response = requests.post(url, json=data, headers=headers, timeout=30) + + # Check if the request was successful + response.raise_for_status() + + # Log successful response + logger.info(f"HTTP request to myapp_2 successful. Status Code: {response.status_code}") + + return { + "status": "success", + "status_code": response.status_code, + "response": response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text + } + + except requests.exceptions.Timeout: + logger.error("HTTP request to myapp_2 timed out") + raise HTTPException(status_code=504, detail="Request to myapp_2 service timed out") + + except requests.exceptions.ConnectionError: + logger.error("Failed to connect to myapp_2 service") + raise HTTPException(status_code=502, detail="Failed to connect to myapp_2 service") + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP error occurred when calling myapp_2: {e}") + raise HTTPException(status_code=response.status_code, detail=f"myapp_2 service error: {response.text}") + + except requests.exceptions.RequestException as e: + logger.error(f"Request error occurred when calling myapp_2: {e}") + raise HTTPException(status_code=500, detail="Failed to make request to myapp_2 service") + + +@app.post("/go_py_py") +@observe("cftocf_endpoint") +def cftocf_endpoint(request: Request, body: Item): + """ + This is the target endpoint that myapp will call. + It processes Item data and returns a response. + """ + try: + logger.info(f"Received go_py_py request for item: {body.name} with id: {body.id}") + + # Process the item (you can add your business logic here) + processed_data = { + "id": body.id, + "name": body.name, + "price": body.price, + } + + logger.info(f"Successfully processed item {body.id}") + + + + return http_req(body) + + except Exception as e: + logger.error(f"Error in cftocf endpoint: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to process item: {str(e)}") + def main(): # write to otel collector _ = Pulse( diff --git a/examples/otel-collector/pulse_go_otel_instrumentor/config.go b/examples/otel-collector/pulse_go_otel_instrumentor/config.go new file mode 100644 index 0000000..90e6ed3 --- /dev/null +++ b/examples/otel-collector/pulse_go_otel_instrumentor/config.go @@ -0,0 +1,246 @@ +package pulse_otel + +import ( + "context" + "fmt" + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" +) + +// Config holds OpenTelemetry configuration +type Config struct { + ServiceName string + ServiceVersion string + Environment string + Headers map[string]string + Timeout time.Duration + ResourceAttributes map[string]string +} + +// DefaultConfig returns a default configuration +func DefaultConfig() *Config { + return &Config{ + ServiceName: "default-service", + ServiceVersion: "1.0.0", + Environment: "development", + Headers: make(map[string]string), + Timeout: 30 * time.Second, + ResourceAttributes: make(map[string]string), + } +} + +// AddResourceAttribute adds a resource attribute to the configuration +func (c *Config) AddResourceAttribute(key, value string) { + if c.ResourceAttributes == nil { + c.ResourceAttributes = make(map[string]string) + } + c.ResourceAttributes[key] = value +} + +// AddHeader adds a header for the OTLP exporter +func (c *Config) AddHeader(key, value string) { + if c.Headers == nil { + c.Headers = make(map[string]string) + } + c.Headers[key] = value +} + +type ProjectTraceProvider struct { + traceProvider *trace.TracerProvider + collectorEndpointURL string // URL of the OTLP collector for this project + isCollectorReachable bool + mutex sync.RWMutex // Protects isCollectorReachable +} + +// PulseTraceManager manages OpenTelemetry providers for multiple projects +type PulseTraceManager struct { + projectTraceProviders map[string]*ProjectTraceProvider + baseConfig *Config + mutex sync.RWMutex +} + +// NewPulseTraceManager creates a new pulse trace manager +func NewPulseTraceManager(baseConfig *Config) *PulseTraceManager { + if baseConfig == nil { + baseConfig = DefaultConfig() + } + + return &PulseTraceManager{ + projectTraceProviders: make(map[string]*ProjectTraceProvider), + baseConfig: baseConfig, + } +} + +// IsCollectorReachable safely reads the collector reachability status +func (ptp *ProjectTraceProvider) IsCollectorReachable() bool { + ptp.mutex.RLock() + defer ptp.mutex.RUnlock() + return ptp.isCollectorReachable +} + +// SetCollectorReachable safely updates the collector reachability status +func (ptp *ProjectTraceProvider) SetCollectorReachable(reachable bool) { + ptp.mutex.Lock() + defer ptp.mutex.Unlock() + ptp.isCollectorReachable = reachable +} + +// CheckAndUpdateCollectorReachability checks if collector is reachable and updates the status +func (tm *PulseTraceManager) CheckAndUpdateCollectorReachability(projectID string) (bool, error) { + provider, err := tm.GetTracerProvider(projectID) + if err != nil { + return false, err + } + + // Check current status first + currentStatus := provider.IsCollectorReachable() + + // If already marked as reachable, return without checking again to avoid overhead + if currentStatus { + return true, nil + } + + // Check actual reachability + isReachable := isReachable(provider.collectorEndpointURL, 3*time.Second) + + // Update the status + provider.SetCollectorReachable(isReachable) + + return isReachable, nil +} + +// GetTracerProvider returns or creates a tracer provider for a specific project +func (tm *PulseTraceManager) GetTracerProvider(projectID string) (*ProjectTraceProvider, error) { + tm.mutex.RLock() + provider, exists := tm.projectTraceProviders[projectID] + tm.mutex.RUnlock() + + if exists { + return provider, nil + } + + tm.mutex.Lock() + defer tm.mutex.Unlock() + + // Double-check after acquiring write lock + if provider, exists := tm.projectTraceProviders[projectID]; exists { + return provider, nil + } + + // Create new provider for project + provider, err := tm.createProjectTraceProvider(projectID) + if err != nil { + return nil, fmt.Errorf("failed to create provider for project %s: %w", projectID, err) + } + + tm.projectTraceProviders[projectID] = provider + return provider, nil +} + +func (tm *PulseTraceManager) createProjectTraceProvider(projectID string) (*ProjectTraceProvider, error) { + ctx := context.Background() + + // Create project-specific resource + res, err := tm.createProjectTraceResource(projectID) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + // Form project-specific collector endpoint + // collectorEndpointURL := strings.Replace(OTEL_COLLECTOR_ENDPOINT, "{PROJECTID_PLACEHOLDER}", projectID, 1) + collectorEndpointURL := "otel-collector:4318" + + isCollectorReachable := isReachable(collectorEndpointURL, 3*time.Second) + + // Create OTLP HTTP exporter for this project + exporter, err := otlptracehttp.New(ctx, + otlptracehttp.WithEndpoint(collectorEndpointURL), + otlptracehttp.WithHeaders(tm.baseConfig.Headers), + otlptracehttp.WithTimeout(tm.baseConfig.Timeout), + otlptracehttp.WithInsecure(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + + // Create tracer provider + provider := trace.NewTracerProvider( + trace.WithBatcher(exporter), + trace.WithResource(res), + trace.WithSampler(trace.AlwaysSample()), + ) + + return &ProjectTraceProvider{ + traceProvider: provider, + isCollectorReachable: isCollectorReachable, // Assume reachable initially + collectorEndpointURL: collectorEndpointURL, + }, nil +} + +func (tm *PulseTraceManager) createProjectTraceResource(projectID string) (*resource.Resource, error) { + // Start with base attributes + attributes := []attribute.KeyValue{ + semconv.ServiceName(tm.baseConfig.ServiceName), + semconv.ServiceVersion(tm.baseConfig.ServiceVersion), + semconv.DeploymentEnvironment(tm.baseConfig.Environment), + attribute.String("project.id", projectID), + } + + // Add custom resource attributes + for key, value := range tm.baseConfig.ResourceAttributes { + attributes = append(attributes, attribute.String(key, value)) + } + + return resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + attributes..., + ), + ) +} + +// Shutdown gracefully shuts down all project providers +func (tm *PulseTraceManager) Shutdown(ctx context.Context) error { + tm.mutex.Lock() + defer tm.mutex.Unlock() + + var errors []error + for projectID, provider := range tm.projectTraceProviders { + if provider.traceProvider != nil { + if err := provider.traceProvider.Shutdown(ctx); err != nil { + errors = append(errors, fmt.Errorf("failed to shutdown trace provider for project %s: %w", projectID, err)) + } + } + } + + if len(errors) > 0 { + return fmt.Errorf("shutdown errors: %v", errors) + } + + return nil +} + +// GetTracer returns a project-specific tracer +func (tm *PulseTraceManager) GetTracer(projectID, tracerName string) (*Tracer, error) { + provider, err := tm.GetTracerProvider(projectID) + if err != nil { + return nil, err + } + + if provider.traceProvider == nil { + return nil, fmt.Errorf("trace provider for project %s is not initialized", projectID) + } + + tracer := provider.traceProvider.Tracer(tracerName) + return &Tracer{ + tracer: tracer, + name: tracerName, + }, nil +} diff --git a/examples/otel-collector/pulse_go_otel_instrumentor/const.go b/examples/otel-collector/pulse_go_otel_instrumentor/const.go new file mode 100644 index 0000000..30796cc --- /dev/null +++ b/examples/otel-collector/pulse_go_otel_instrumentor/const.go @@ -0,0 +1,3 @@ +package pulse_otel + +const OTEL_COLLECTOR_ENDPOINT = "otel-collector-{PROJECTID_PLACEHOLDER}.observability.svc.cluster.local:4317" diff --git a/examples/otel-collector/pulse_go_otel_instrumentor/go.mod b/examples/otel-collector/pulse_go_otel_instrumentor/go.mod new file mode 100644 index 0000000..9d93888 --- /dev/null +++ b/examples/otel-collector/pulse_go_otel_instrumentor/go.mod @@ -0,0 +1,30 @@ +module github.com/aanshu-ss/s2-otel-instrumentation-go + +go 1.22 + +toolchain go1.24.4 + +require ( + go.opentelemetry.io/otel v1.30.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 + go.opentelemetry.io/otel/sdk v1.30.0 + go.opentelemetry.io/otel/trace v1.30.0 +) + +require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.30.0 // indirect + go.opentelemetry.io/proto/otlp v1.3.1 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + google.golang.org/grpc v1.66.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) diff --git a/examples/otel-collector/pulse_go_otel_instrumentor/go.sum b/examples/otel-collector/pulse_go_otel_instrumentor/go.sum new file mode 100644 index 0000000..dab23ef --- /dev/null +++ b/examples/otel-collector/pulse_go_otel_instrumentor/go.sum @@ -0,0 +1,49 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= +go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 h1:digkEZCJWobwBqMwC0cwCq8/wkkRy/OowZg5OArWZrM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0/go.mod h1:/OpE/y70qVkndM0TrxT4KBoN3RsFZP0QaofcfYrj76I= +go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= +go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ= +go.opentelemetry.io/otel/sdk v1.30.0 h1:cHdik6irO49R5IysVhdn8oaiR9m8XluDaJAs4DfOrYE= +go.opentelemetry.io/otel/sdk v1.30.0/go.mod h1:p14X4Ok8S+sygzblytT1nqG98QG2KYKv++HE0LY/mhg= +go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= +go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 h1:hjSy6tcFQZ171igDaN5QHOw2n6vx40juYbC/x67CEhc= +google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:qpvKtACPCQhAdu3PyQgV4l3LMXZEtft7y8QcarRsp9I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.66.1 h1:hO5qAXR19+/Z44hmvIM4dQFMSYX9XcWsByfoxutBpAM= +google.golang.org/grpc v1.66.1/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/otel-collector/pulse_go_otel_instrumentor/main/main.go b/examples/otel-collector/pulse_go_otel_instrumentor/main/main.go new file mode 100644 index 0000000..dad8527 --- /dev/null +++ b/examples/otel-collector/pulse_go_otel_instrumentor/main/main.go @@ -0,0 +1,312 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "strconv" + "time" + + pulse_otel "github.com/aanshu-ss/s2-otel-instrumentation-go" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +type User struct { + ID string `json:"id"` + Name string `json:"name"` + Email string `json:"email"` +} + +type Response struct { + Success bool `json:"success"` + Data interface{} `json:"data,omitempty"` + Error string `json:"error,omitempty"` +} + +var ( + middleware *pulse_otel.HTTPMiddleware // Only keep the middleware as global +) + +func main() { + // Initialize OpenTelemetry base configuration + config := pulse_otel.DefaultConfig() + config.ServiceName = "user-api" + config.ServiceVersion = "1.0.0" + config.Environment = "development" + config.AddResourceAttribute("api.type", "rest") + config.AddResourceAttribute("team", "backend") + + // Create HTTP middleware with project support (this creates project manager internally) + middleware = pulse_otel.NewHTTPMiddleware("user-api", config) + + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + middleware.Shutdown(ctx) // Shutdown through middleware + }() + + // Setup routes with instrumentation + http.Handle("/users", middleware.Handler(http.HandlerFunc(getUsersHandler))) + http.Handle("/users/create", middleware.Handler(http.HandlerFunc(createUserHandler))) + http.Handle("/health", middleware.Handler(http.HandlerFunc(healthHandler))) + http.Handle("/my-post", middleware.Handler(http.HandlerFunc(myPostHandler))) + + log.Println("Starting server on :8093") + log.Fatal(http.ListenAndServe(":8093", nil)) +} + +func getUsersHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // Get project-specific tracer + projectID := r.Header.Get("x-project-id") + if projectID == "" { + projectID = "default" + } + + tracer, err := middleware.GetPulseTraceManager().GetTracer(projectID, "user-api") + if err != nil { + log.Printf("Error getting tracer for project %s: %v", projectID, err) + writeErrorResponse(w, "Internal server error", http.StatusInternalServerError) + return + } + + // Add custom span attributes + tracer.AddSpanAttributes(ctx, + attribute.String("handler.name", "getUsers"), + attribute.String("operation.type", "read"), + attribute.String("project.id", projectID), + ) + + // Simulate database call with child span + users, err := fetchUsersFromDB(ctx, tracer) + if err != nil { + tracer.RecordError(ctx, err) + writeErrorResponse(w, "Failed to fetch users", http.StatusInternalServerError) + return + } + + tracer.AddSpanAttribute(ctx, "users.count", strconv.Itoa(len(users))) + writeSuccessResponse(w, users) +} + +func createUserHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Get project-specific tracer + projectID := r.Header.Get("x-project-id") + if projectID == "" { + projectID = "default" + } + + tracer, err := middleware.GetPulseTraceManager().GetTracer(projectID, "user-api") + if err != nil { + log.Printf("Error getting tracer for project %s: %v", projectID, err) + writeErrorResponse(w, "Internal server error", http.StatusInternalServerError) + return + } + + var user User + if err := json.NewDecoder(r.Body).Decode(&user); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + + // Add request attributes + tracer.AddSpanAttributes(ctx, + attribute.String("user.name", user.Name), + attribute.String("user.email", user.Email), + attribute.String("project.id", projectID), + ) + + // Create user in database + createdUser, err := createUserInDB(ctx, user, tracer) + if err != nil { + tracer.RecordError(ctx, err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + // Add response attributes + tracer.AddSpanAttributes(ctx, + attribute.String("created.user_id", createdUser.ID), + attribute.String("response.status", "created"), + ) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(createdUser) +} + +// Update database functions to accept tracer parameter +func fetchUsersFromDB(ctx context.Context, tracer *pulse_otel.Tracer) ([]User, error) { + return pulse_otel.WithSpanReturnTyped(tracer, ctx, "db.fetch_users", func(ctx context.Context) ([]User, error) { + tracer.AddSpanAttributes(ctx, + attribute.String("db.operation", "SELECT"), + attribute.String("db.table", "users"), + ) + + // Simulate database latency + time.Sleep(50 * time.Millisecond) + + users := []User{ + {ID: "1", Name: "John Doe", Email: "john@example.com"}, + {ID: "2", Name: "Jane Smith", Email: "jane@example.com"}, + } + + tracer.AddSpanAttribute(ctx, "db.rows_affected", strconv.Itoa(len(users))) + return users, nil + }) +} + +func createUserInDB(ctx context.Context, user User, tracer *pulse_otel.Tracer) (User, error) { + return pulse_otel.WithSpanReturnTyped(tracer, ctx, "db.create_user", func(ctx context.Context) (User, error) { + tracer.AddSpanAttributes(ctx, + attribute.String("db.operation", "INSERT"), + attribute.String("db.table", "users"), + attribute.String("user.email", user.Email), + ) + + // Simulate database write latency + time.Sleep(100 * time.Millisecond) + + user.ID = fmt.Sprintf("user_%d", time.Now().Unix()) + tracer.AddSpanAttribute(ctx, "user.id", user.ID) + return user, nil + }) +} + +func healthHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // Get project-specific tracer + projectID := r.Header.Get("x-project-id") + if projectID == "" { + projectID = "default" + } + + tracer, err := middleware.GetPulseTraceManager().GetTracer(projectID, "user-api") + if err != nil { + log.Printf("Error getting tracer for project %s: %v", projectID, err) + writeErrorResponse(w, "Internal server error", http.StatusInternalServerError) + return + } + + tracer.AddSpanAttribute(ctx, "handler.name", "health") + + response := map[string]interface{}{ + "status": "healthy", + "timestamp": time.Now().UTC(), + "service": "user-api", + } + + writeSuccessResponse(w, response) +} + +func writeSuccessResponse(w http.ResponseWriter, data interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(Response{ + Success: true, + Data: data, + }) +} + +func writeErrorResponse(w http.ResponseWriter, message string, statusCode int) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + json.NewEncoder(w).Encode(Response{ + Success: false, + Error: message, + }) +} + +func myPostHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // Debug: Check if we have an active span + span := trace.SpanFromContext(ctx) + fmt.Printf("Handler: Active span found: %t, Recording: %t\n", span != nil, span.IsRecording()) + + // Get project-specific tracer + projectID := r.Header.Get("x-project-id") + if projectID == "" { + projectID = "default" + } + + tracer, err := middleware.GetPulseTraceManager().GetTracer(projectID, "user-api") + if err != nil { + log.Printf("Error getting tracer for project %s: %v", projectID, err) + writeErrorResponse(w, "Internal server error", http.StatusInternalServerError) + return + } + + tracer.AddSpanAttribute(ctx, "handler.name", "createHandlerFunc") + + url := "http://myapp_2:8000/go_py_py" + + var reqBody struct { + Name string `json:"name"` + Price string `json:"price"` + ID string `json:"id"` + } + if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil { + writeErrorResponse(w, "Invalid request body", http.StatusBadRequest) + return + } + + newBody, err := json.Marshal(map[string]string{ + "name": reqBody.Name, + "price": reqBody.Price, + "id": reqBody.ID, + }) + if err != nil { + tracer.RecordError(ctx, err) + writeErrorResponse(w, "Failed to marshal request body", http.StatusInternalServerError) + return + } + + // Simulate a POST request to the given URL using instrumented HTTP client + + // Create request with context to ensure trace propagation + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(newBody)) + if err != nil { + tracer.RecordError(ctx, err) + writeErrorResponse(w, "Failed to create request", http.StatusInternalServerError) + return + } + req.Header.Set("Content-Type", "application/json") + + fmt.Printf("Handler: Making HTTP request to %s\n", url) + // Use instrumented HTTP client for automatic span creation and propagation + resp, err := pulse_otel.GetInstrumentedHTTPClient().Do(req) + fmt.Printf("Handler: HTTP request completed, status: %v, error: %v\n", + func() string { + if resp != nil { + return fmt.Sprintf("%d", resp.StatusCode) + } else { + return "nil" + } + }(), err) + if err != nil { + tracer.RecordError(ctx, err) + writeErrorResponse(w, "Failed to make request", http.StatusInternalServerError) + return + } + defer resp.Body.Close() + + w.WriteHeader(resp.StatusCode) + json.NewEncoder(w).Encode(Response{ + Success: true, + Data: fmt.Sprintf("Request made to %s with status %d", url, resp.StatusCode), + }) +} diff --git a/examples/otel-collector/pulse_go_otel_instrumentor/middleware.go b/examples/otel-collector/pulse_go_otel_instrumentor/middleware.go new file mode 100644 index 0000000..0eee7f5 --- /dev/null +++ b/examples/otel-collector/pulse_go_otel_instrumentor/middleware.go @@ -0,0 +1,291 @@ +package pulse_otel + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" +) + +// HTTPMiddleware provides HTTP instrumentation middleware +type HTTPMiddleware struct { + pulseTraceManager *PulseTraceManager + serviceName string +} + +// NewHTTPMiddleware creates a new HTTP middleware with project support +func NewHTTPMiddleware(serviceName string, baseConfig *Config) *HTTPMiddleware { + // Set up global OpenTelemetry providers so instrumented libraries can use them + setupGlobalOTelProviders(baseConfig) + + return &HTTPMiddleware{ + pulseTraceManager: NewPulseTraceManager(baseConfig), + serviceName: serviceName, + } +} + +// setupGlobalOTelProviders configures global OpenTelemetry providers +func setupGlobalOTelProviders(baseConfig *Config) { + // Create a default tracer provider for global use + defaultProvider, err := NewPulseTraceManager(baseConfig).GetTracerProvider("default") + if err == nil { + // Set the global tracer provider so instrumented libraries can use it + otel.SetTracerProvider(defaultProvider.traceProvider) + } + + // Set the global text map propagator for context propagation + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) +} + +// GetPulseTraceManager returns the pulse trace manager instance +func (m *HTTPMiddleware) GetPulseTraceManager() *PulseTraceManager { + return m.pulseTraceManager +} + +// Shutdown gracefully shuts down the middleware and its project manager +func (m *HTTPMiddleware) Shutdown(ctx context.Context) error { + return m.pulseTraceManager.Shutdown(ctx) +} + +// Handler wraps an http.Handler with opentelemetry instrumentation +func (m *HTTPMiddleware) Handler(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Extract project ID from header first + projectID := r.Header.Get("singlestore-project-id") + + // If not found in header, check JSON body + if projectID == "" { + projectID = m.extractProjectIDFromBody(r) + } + + fmt.Println("Project ID extracted:", projectID) + + // Get project-specific tracer provider + provider, err := m.pulseTraceManager.GetTracerProvider(projectID) + if err != nil { + // Log error and use default behavior + fmt.Printf("Error getting project provider for %s: %v\n", projectID, err) + handler.ServeHTTP(w, r) + return + } + + // Check and update collector reachability safely + isCollectorReachable, err := m.pulseTraceManager.CheckAndUpdateCollectorReachability(projectID) + if err != nil { + fmt.Printf("Error checking collector reachability for project %s: %v\n", projectID, err) + handler.ServeHTTP(w, r) + return + } + + fmt.Println("isCollectorReachable:", isCollectorReachable) + if !isCollectorReachable { + // If collector is not reachable, skip tracing + handler.ServeHTTP(w, r) + return + } + + tracer := provider.traceProvider.Tracer(m.serviceName) + + // Extract any existing trace context from incoming request headers + ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header)) + + spanName := fmt.Sprintf("%s %s", r.Method, r.URL.Path) + ctx, span := tracer.Start(ctx, spanName, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes( + semconv.HTTPRoute(r.URL.Path), + attribute.String("project.id", projectID), + ), + ) + defer span.End() + + // IMPORTANT: Set the global tracer provider to the project-specific one + // This ensures that any instrumented library will use the correct tracer provider + otel.SetTracerProvider(provider.traceProvider) + + wrappedWriter := &responseWriter{ + ResponseWriter: w, + statusCode: http.StatusOK, + } + + // Execute the handler with the instrumented context + // The context now contains the active span that instrumented libraries can use + r = r.WithContext(ctx) + start := time.Now() + handler.ServeHTTP(wrappedWriter, r) + duration := time.Since(start) + + // Add response attributes + span.SetAttributes( + attribute.Float64("http.duration_ms", float64(duration.Nanoseconds())/1000000), + ) + + // Set span status based on HTTP status code + if wrappedWriter.statusCode >= 400 { + span.SetStatus(codes.Error, fmt.Sprintf("HTTP %d", wrappedWriter.statusCode)) + } + + // Inject trace context into response headers + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(w.Header())) + }) +} + +// extractProjectIDFromBody attempts to extract project-id from the request body +func (m *HTTPMiddleware) extractProjectIDFromBody(r *http.Request) string { + // Only check for project-id in POST/PUT/PATCH requests with JSON content + if r.Method != http.MethodPost && r.Method != http.MethodPut && r.Method != http.MethodPatch { + return "" + } + + contentType := r.Header.Get("Content-Type") + if contentType != "application/json" && contentType != "application/json; charset=utf-8" { + return "" + } + + // Read the body + body, err := io.ReadAll(r.Body) + if err != nil { + return "" + } + + // Restore the body for the actual handler to use + r.Body = io.NopCloser(bytes.NewBuffer(body)) + + // Parse JSON to extract project-id + var jsonData map[string]interface{} + if err := json.Unmarshal(body, &jsonData); err != nil { + return "" + } + + // Extract project-id from JSON + if projectID, exists := jsonData["project-id"]; exists { + if projectIDStr, ok := projectID.(string); ok { + return projectIDStr + } + } + + return "" +} + +func (m *HTTPMiddleware) HandlerFunc(handler http.HandlerFunc) http.HandlerFunc { + return m.Handler(handler).ServeHTTP +} + +// responseWriter wraps http.ResponseWriter to capture response details +type responseWriter struct { + http.ResponseWriter + statusCode int + bytesWritten int64 +} + +func (rw *responseWriter) WriteHeader(code int) { + rw.statusCode = code + rw.ResponseWriter.WriteHeader(code) +} + +func (rw *responseWriter) Write(data []byte) (int, error) { + n, err := rw.ResponseWriter.Write(data) + rw.bytesWritten += int64(n) + return n, err +} + +// GetInstrumentedHTTPClient returns an HTTP client that will automatically +// create spans for outgoing requests when used within a traced context +func GetInstrumentedHTTPClient() *http.Client { + return &http.Client{ + Transport: &InstrumentedTransport{ + base: http.DefaultTransport, + }, + } +} + +// InstrumentedTransport wraps http.RoundTripper to add automatic tracing +type InstrumentedTransport struct { + base http.RoundTripper +} + +// RoundTrip implements http.RoundTripper and automatically creates spans for HTTP requests +func (t *InstrumentedTransport) RoundTrip(req *http.Request) (*http.Response, error) { + ctx := req.Context() + + // Debug: Print context information + fmt.Printf("HTTP Client: Request URL: %s\n", req.URL.String()) + + // Get the active span from context (if any) + span := trace.SpanFromContext(ctx) + fmt.Printf("HTTP Client: Active span found: %t, Recording: %t\n", span != nil, span.IsRecording()) + + if !span.IsRecording() { + // No active span, just pass through + fmt.Println("HTTP Client: No recording span, passing through") + return t.base.RoundTrip(req) + } + + // Get tracer from the global tracer provider + tracer := otel.Tracer("http-client") + + // Create a new span for the HTTP request + spanName := fmt.Sprintf("HTTP %s %s", req.Method, req.URL.Host) + ctx, clientSpan := tracer.Start(ctx, spanName, + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("http.method", req.Method), + attribute.String("http.url", req.URL.String()), + attribute.String("http.scheme", req.URL.Scheme), + attribute.String("http.host", req.URL.Host), + attribute.String("http.target", req.URL.Path), + ), + ) + defer clientSpan.End() + + fmt.Printf("HTTP Client: Created client span: %s\n", spanName) + + // Inject trace context into request headers for downstream propagation + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header)) + + // Update request with new context + req = req.WithContext(ctx) + + // Make the request + start := time.Now() + resp, err := t.base.RoundTrip(req) + duration := time.Since(start) + + // Add response attributes + clientSpan.SetAttributes( + attribute.Float64("http.duration_ms", float64(duration.Nanoseconds())/1000000), + ) + + if err != nil { + clientSpan.RecordError(err) + clientSpan.SetStatus(codes.Error, err.Error()) + return resp, err + } + + // Add response status + clientSpan.SetAttributes( + attribute.Int("http.status_code", resp.StatusCode), + ) + + // Set span status based on HTTP status code + if resp.StatusCode >= 400 { + clientSpan.SetStatus(codes.Error, fmt.Sprintf("HTTP %d", resp.StatusCode)) + } else { + clientSpan.SetStatus(codes.Ok, "") + } + + return resp, nil +} diff --git a/examples/otel-collector/pulse_go_otel_instrumentor/tracer.go b/examples/otel-collector/pulse_go_otel_instrumentor/tracer.go new file mode 100644 index 0000000..43e9068 --- /dev/null +++ b/examples/otel-collector/pulse_go_otel_instrumentor/tracer.go @@ -0,0 +1,183 @@ +package pulse_otel + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// Tracer wraps OpenTelemetry tracer with additional utilities +type Tracer struct { + tracer trace.Tracer + name string +} + +func NewTracer(name string) *Tracer { + return &Tracer{ + tracer: otel.Tracer(name), + name: name, + } +} + +// StartSpan starts a new span with the given name +func (t *Tracer) StartSpan(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + return t.tracer.Start(ctx, spanName, opts...) +} + +// StartSpanWithParent starts a new span with a parent span context +func (t *Tracer) StartSpanWithParent(parentCtx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + return t.tracer.Start(parentCtx, spanName, opts...) +} + +// AddSpanAttributes adds attributes to the current span in context +func (t *Tracer) AddSpanAttributes(ctx context.Context, attributes ...attribute.KeyValue) { + span := trace.SpanFromContext(ctx) + if span != nil { + span.SetAttributes(attributes...) + } +} + +// AddSpanAttribute adds a single attribute to the current span in context +func (t *Tracer) AddSpanAttribute(ctx context.Context, key, value string) { + t.AddSpanAttributes(ctx, attribute.String(key, value)) +} + +// RecordError records an error in the current span +func (t *Tracer) RecordError(ctx context.Context, err error, opts ...trace.EventOption) { + span := trace.SpanFromContext(ctx) + if span != nil { + span.RecordError(err, opts...) + span.SetStatus(codes.Error, err.Error()) + } +} + +// SetSpanStatus sets the status of the current span +func (t *Tracer) SetSpanStatus(ctx context.Context, code codes.Code, description string) { + span := trace.SpanFromContext(ctx) + if span != nil { + span.SetStatus(code, description) + } +} + +// FinishSpan safely finishes a span +func (t *Tracer) FinishSpan(span trace.Span) { + if span != nil { + span.End() + } +} + +// WithSpan executes a function within a new span +func (t *Tracer) WithSpan(ctx context.Context, spanName string, fn func(context.Context) error, opts ...trace.SpanStartOption) error { + ctx, span := t.StartSpan(ctx, spanName, opts...) + defer span.End() + + if err := fn(ctx); err != nil { + t.RecordError(ctx, err) + return err + } + + return nil +} + +// GetTraceID returns the trace ID from the current context +func (t *Tracer) GetTraceID(ctx context.Context) string { + span := trace.SpanFromContext(ctx) + if span != nil { + return span.SpanContext().TraceID().String() + } + return "" +} + +// GetSpanID returns the span ID from the current context +func (t *Tracer) GetSpanID(ctx context.Context) string { + span := trace.SpanFromContext(ctx) + if span != nil { + return span.SpanContext().SpanID().String() + } + return "" +} + +// CreateChildSpan creates a child span from the current context +func (t *Tracer) CreateChildSpan(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + return t.tracer.Start(ctx, spanName, opts...) +} + +// WithSpanReturn executes a function within a new span and returns a value +func (t *Tracer) WithSpanReturn(ctx context.Context, spanName string, fn func(context.Context) (interface{}, error), opts ...trace.SpanStartOption) (interface{}, error) { + ctx, span := t.StartSpan(ctx, spanName, opts...) + defer span.End() + + result, err := fn(ctx) + if err != nil { + t.RecordError(ctx, err) + return result, err + } + + return result, nil +} + +// WithSpanReturnTyped executes a function within a new span and returns a typed value +func WithSpanReturnTyped[T any](t *Tracer, ctx context.Context, spanName string, fn func(context.Context) (T, error), opts ...trace.SpanStartOption) (T, error) { + ctx, span := t.StartSpan(ctx, spanName, opts...) + defer span.End() + + result, err := fn(ctx) + if err != nil { + t.RecordError(ctx, err) + return result, err + } + + return result, nil +} + +// SpanHelper provides convenient span operations +type SpanHelper struct { + span trace.Span + ctx context.Context +} + +// NewSpanHelper creates a new span helper +func NewSpanHelper(ctx context.Context, span trace.Span) *SpanHelper { + return &SpanHelper{ + span: span, + ctx: ctx, + } +} + +// AddAttribute adds an attribute to the span +func (s *SpanHelper) AddAttribute(key, value string) *SpanHelper { + s.span.SetAttributes(attribute.String(key, value)) + return s +} + +// AddAttributes adds multiple attributes to the span +func (s *SpanHelper) AddAttributes(attrs ...attribute.KeyValue) *SpanHelper { + s.span.SetAttributes(attrs...) + return s +} + +// SetStatus sets the span status +func (s *SpanHelper) SetStatus(code codes.Code, description string) *SpanHelper { + s.span.SetStatus(code, description) + return s +} + +// RecordError records an error +func (s *SpanHelper) RecordError(err error) *SpanHelper { + s.span.RecordError(err) + s.span.SetStatus(codes.Error, err.Error()) + return s +} + +// End ends the span +func (s *SpanHelper) End() { + s.span.End() +} + +// Context returns the span context +func (s *SpanHelper) Context() context.Context { + return s.ctx +} diff --git a/examples/otel-collector/pulse_go_otel_instrumentor/utils.go b/examples/otel-collector/pulse_go_otel_instrumentor/utils.go new file mode 100644 index 0000000..6f9c5df --- /dev/null +++ b/examples/otel-collector/pulse_go_otel_instrumentor/utils.go @@ -0,0 +1,25 @@ +package pulse_otel + +import ( + "net" + "net/url" + "time" +) + +// isReachable checks if the host:port endpoint is reachable within the timeout +func isReachable(endpoint string, timeout time.Duration) bool { + conn, err := net.DialTimeout("tcp", endpoint, timeout) + if err != nil { + return false + } + defer conn.Close() + return true +} + +func stripScheme(fullURL string) (string, error) { + parsed, err := url.Parse(fullURL) + if err != nil { + return "", err + } + return parsed.Host, nil +} diff --git a/examples/otel-collector/requirements.txt b/examples/otel-collector/requirements.txt index 0af110b..1ebfa48 100644 --- a/examples/otel-collector/requirements.txt +++ b/examples/otel-collector/requirements.txt @@ -1,3 +1,8 @@ fastapi uvicorn +python-dotenv +openai +requests +tenacity +pydantic diff --git a/examples/otel-collector/singlestore_pulse-0.3-py3-none-any.whl b/examples/otel-collector/singlestore_pulse-0.3-py3-none-any.whl new file mode 100644 index 0000000..5454784 Binary files /dev/null and b/examples/otel-collector/singlestore_pulse-0.3-py3-none-any.whl differ diff --git a/pulse_otel/main.py b/pulse_otel/main.py index 3301f60..b4601cc 100644 --- a/pulse_otel/main.py +++ b/pulse_otel/main.py @@ -319,58 +319,34 @@ def wrapper(*args, **kwargs): def observe(name): - """ - A decorator factory that instruments a function for observability using opentelemetry tracing. - Args: - name (str): The name of the span to be created for tracing. - Returns: - Callable: A decorator that wraps the target function, extracting opentelemetry tracing context - from the incoming request (if available), and starts a new tracing span using - the provided name. If no context is found, a new span is started without context. - Behavior: - - Adds session ID to span attributes if available in kwargs. - - Attempts to extract a tracing context from the 'request' argument or from positional arguments. - - Starts a tracing span with the extracted context (if present) or as a new trace. - - Logs debug information about the tracing context and span creation. - - Supports usage both within and outside of HTTP request contexts. - Example: - @observe("my_function_span") - def my_function(request: Request, ...): - ... - """ def decorator(func): - decorated_func = agent(name)(func) - logger.debug("Decorating function with observe:", name) - + print("Decorating:", func.__name__) @functools.wraps(func) def wrapper(*args, **kwargs): - add_session_id_to_span_attributes(**kwargs) - request: Request = kwargs.get("request") + print(f"[observe] wrapper called for {func.__name__}") + request = kwargs.get("request") + print(f"request: {request}") + print(f"kwargs: {kwargs}") if request is None: for arg in args: if isinstance(arg, Request): request = arg break - # Extract context from request if available - ctx = extract(request.headers) if request else None - - if ctx: - logger.debug(f"Starting span with context: {ctx}") - # Start span with context - with tracer.start_as_current_span(name, context=ctx, kind=SpanKind.SERVER): - return decorated_func(*args, **kwargs) + if request: + print(f"[observe] Request headers: {request.headers}") + ctx = extract(request.headers) if request else None + if ctx: + print(f"[observe] Extracted context: {ctx}") + with tracer.start_as_current_span(name, context=ctx, kind=SpanKind.SERVER): + return func(*args, **kwargs) + else: + print("[observe] No context found in request headers.") else: - logger.debug("No context found, starting span without context.") - - # Start span without context - # This is useful for cases where we want to start a span without any specific context - # e.g., when the function is called outside of an HTTP request context - # or when we want to create a fresh new trace or context is not properly propagated. - return decorated_func(*args, **kwargs) + print("[observe] No request found.") + return func(*args, **kwargs) return wrapper - return decorator class CustomFileSpanExporter(SpanExporter):