diff --git a/pyproject.toml b/pyproject.toml index 8d8313ab..9848750a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ ] license = { text = "BSD-3-Clause" } requires-python = ">=3.10" -dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7"] +dependencies = ["zocalo","marshmallow","bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] [project.urls] Download = "https://github.com/DiamondLightSource/python-workflows/releases" diff --git a/requirements_dev.txt b/requirements_dev.txt index 8207c45b..bd711be7 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -7,3 +7,7 @@ pytest-mock==3.14.0 pytest-timeout==2.3.1 stomp-py==8.1.2 websocket-client==1.8.0 +opentelemetry-api==1.20.0 +opentelemetry-sdk==1.20.0 +opentelemetry-exporter-otlp-proto-http==1.20.0 +marshmallow \ No newline at end of file diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 0f1973f4..143d5705 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -5,6 +5,8 @@ from collections.abc import Callable from typing import Any +from opentelemetry import trace + from workflows.recipe.recipe import Recipe from workflows.recipe.validate import validate_recipe from workflows.recipe.wrapper import RecipeWrapper @@ -69,6 +71,37 @@ def unwrap_recipe(header, message): message = mangle_for_receiving(message) if header.get("workflows-recipe") in {True, "True", "true", 1}: rw = RecipeWrapper(message=message, transport=transport_layer) + + # Extract recipe_id on the current span + span = trace.get_current_span() + recipe_id = None + + # Extract recipe ID from environment + if isinstance(message, dict): + environment = message.get("environment", {}) + if isinstance(environment, dict): + recipe_id = environment.get("ID") + + if recipe_id: + span.set_attribute("recipe_id", recipe_id) + span.add_event( + "recipe.id_extracted", attributes={"recipe_id": recipe_id} + ) + + # Extract span_id and trace_id for logging + span_context = span.get_span_context() + if span_context and span_context.is_valid: + span_id = format(span_context.span_id, "016x") + trace_id = format(span_context.trace_id, "032x") + + log_extra = { + "span_id": span_id, + "trace_id": trace_id, + } + + if recipe_id: + log_extra["recipe_id"] = recipe_id + if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): return callback(rw, header, message.get("payload")) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index de2ef704..c53b02a2 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -9,8 +9,16 @@ import time from typing import Any +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + import workflows import workflows.logging +from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware +from workflows.util.zocalo.configuration import OTEL class Status(enum.Enum): @@ -185,6 +193,39 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) + + # Configure OTELTracing if configuration is available + otel_config = ( + OTEL.config if hasattr(OTEL, "config") and OTEL.config else None + ) + + if otel_config: + # Configure OTELTracing + resource = Resource.create( + { + SERVICE_NAME: self._service_name, + } + ) + + self.log.debug("Configuring OTELTracing") + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) + + # Configure BatchProcessor and OTLPSpanExporter using config values + otlp_exporter = OTLPSpanExporter( + endpoint=otel_config["endpoint"], + timeout=otel_config.get("timeout", 10), + ) + span_processor = BatchSpanProcessor(otlp_exporter) + provider.add_span_processor(span_processor) + + # Add OTELTracingMiddleware to the transport layer + tracer = trace.get_tracer(__name__) + otel_middleware = OTELTracingMiddleware( + tracer, service_name=self._service_name + ) + self._transport.add_middleware(otel_middleware) + metrics = self._environment.get("metrics") if metrics: import prometheus_client diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py new file mode 100644 index 00000000..a3d791fb --- /dev/null +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import functools +from collections.abc import Callable + +from opentelemetry import trace +from opentelemetry.propagate import extract + +from workflows.transport.middleware import BaseTransportMiddleware + + +class OTELTracingMiddleware(BaseTransportMiddleware): + def __init__(self, tracer: trace.Tracer, service_name: str): + self.tracer = tracer + self.service_name = service_name + + def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: + @functools.wraps(callback) + def wrapped_callback(header, message): + # Extract trace context from message headers + ctx = extract(header) if header else None + + # Start a new span with the extracted context + with self.tracer.start_as_current_span( + "transport.subscribe", context=ctx + ) as span: + span.set_attribute("service_name", self.service_name) + span.set_attribute("channel", channel) + + # Call the original callback + return callback(header, message) + + # Call the next middleware with the wrapped callback + return call_next(channel, wrapped_callback, **kwargs) diff --git a/src/workflows/util/zocalo/configuration.py b/src/workflows/util/zocalo/configuration.py index 08a600aa..ca5d77fe 100644 --- a/src/workflows/util/zocalo/configuration.py +++ b/src/workflows/util/zocalo/configuration.py @@ -8,6 +8,32 @@ from workflows.transport.stomp_transport import StompTransport +class OTEL: + """A Zocalo configuration plugin to pre-populate OTELTracing config defaults""" + + class Schema(PluginSchema): + host = fields.Str(required=True) + port = fields.Int(required=True) + endpoint = fields.Str(required=False) + timeout = fields.Int(required=False, load_default=10) + + # Store configuration for access by services + config = {} + + @staticmethod + def activate(configuration): + # Build the full endpoint URL if not provided + if "endpoint" not in configuration: + endpoint = ( + f"https://{configuration['host']}:{configuration['port']}/v1/traces" + ) + else: + endpoint = configuration["endpoint"] + + OTEL.config["endpoint"] = endpoint + OTEL.config["timeout"] = configuration.get("timeout", 10) + + class Stomp: """A Zocalo configuration plugin to pre-populate StompTransport config defaults"""