From 722bf3e227f84f534fe7c1e5146df7aba305a146 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 13 Jan 2026 14:45:38 +0000 Subject: [PATCH 01/16] Add basic tracing middleware and global control --- src/workflows/services/common_service.py | 32 ++++++++++++++ .../transport/middleware/otel_tracing.py | 42 +++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 src/workflows/transport/middleware/otel_tracing.py diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index de2ef70..d79f7fb 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -12,6 +12,13 @@ import workflows import workflows.logging +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware +from opentelemetry.sdk.resources import Resource, SERVICE_NAME + class Status(enum.Enum): """ @@ -185,6 +192,31 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) + + # Configure OTELTracing + resource = Resource.create({ + SERVICE_NAME: self._service_name, + }) + + self.log.debug("Configuring OTELTracing") + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) + + # Configure BatchProcessor and OTLPSpanExporter to point to OTELCollector + otlp_exporter = OTLPSpanExporter( + endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", + timeout=10 + ) + span_processor = BatchSpanProcessor(otlp_exporter) + provider.add_span_processor(span_processor) + + # Add OTELTracingMiddleware to the transport layer + tracer = trace.get_tracer(__name__) + otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name) + self._transport.add_middleware(otel_middleware) + + self.log.debug("OTELTracingMiddleware added to transport layer of %s", self._service_name) + metrics = self._environment.get("metrics") if metrics: import prometheus_client diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py new file mode 100644 index 0000000..af0a1a1 --- /dev/null +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -0,0 +1,42 @@ +from opentelemetry import trace +from workflows.transport.middleware import BaseTransportMiddleware +from collections.abc import Callable +import functools +from opentelemetry.propagate import inject + +class OTELTracingMiddleware(BaseTransportMiddleware): + def __init__(self, tracer: trace.Tracer, service_name: str): + """ + Initialize the OpenTelemetry Tracing Middleware. + + :param tracer: An OpenTelemetry tracer instance used to create spans. + """ + self.tracer = tracer + self.service_name = service_name + + + def send(self, call_next: Callable, destination, message, **kwargs): + """ + Middleware for tracing the `send` operation + + :param call_next: The next middleware or the original `send` method. + :param destination: The destination service to which the message is being sent. + :param message: The message being sent. + :param kwargs: Additional arguments for the `send` method. + """ + + # Start a new span for the `send` operation + with self.tracer.start_as_current_span("transport.send") as span: + # Attributes we're interested in + span.set_attribute("service_name", self.service_name) + span.set_attribute("destination", destination) + span.set_attribute("message", str(message)) + + # Inject trace context into message headers + headers = kwargs.setdefault("headers", {}) + inject(headers) + kwargs["headers"] = headers + + # Call the next middleware or the original `send` method + return call_next(destination, message, **kwargs) + From 52cb04d756370e00294ff3ad17f38020191b374a Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 13:05:03 +0000 Subject: [PATCH 02/16] Instrument on subscribe and add dcid to span attributes --- src/workflows/recipe/__init__.py | 34 ++++++++++++++ .../transport/middleware/otel_tracing.py | 46 ++++++++----------- 2 files changed, 54 insertions(+), 26 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 0f1973f..5834bfe 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -3,6 +3,7 @@ import functools import logging from collections.abc import Callable +from opentelemetry import trace from typing import Any from workflows.recipe.recipe import Recipe @@ -69,6 +70,39 @@ def unwrap_recipe(header, message): message = mangle_for_receiving(message) if header.get("workflows-recipe") in {True, "True", "true", 1}: rw = RecipeWrapper(message=message, transport=transport_layer) + print(rw) + logger.log(1, rw) + + # Extract and set DCID on the current span + span = trace.get_current_span() + dcid = None + + # Try multiple locations where DCID might be stored + top_level_params = {} + if isinstance(message, dict): + # Direct parameters (top-level or in recipe) + top_level_params = message.get("parameters", {}) + + # Payload parameters (most common location) + payload = message.get("payload", {}) + payload_params = {} + if isinstance(payload, dict): + payload_params = payload.get("parameters", {}) + + # Try all common locations + dcid = ( + top_level_params.get("ispyb_dcid") or + top_level_params.get("dcid") or + payload_params.get("ispyb_dcid") or + payload_params.get("dcid") or + payload.get("ispyb_dcid") or + payload.get("dcid") + ) + + if dcid: + span.set_attribute("dcid", dcid) + span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid}) + if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): return callback(rw, header, message.get("payload")) diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index af0a1a1..27e89db 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -2,7 +2,7 @@ from workflows.transport.middleware import BaseTransportMiddleware from collections.abc import Callable import functools -from opentelemetry.propagate import inject +from opentelemetry.propagate import inject, extract class OTELTracingMiddleware(BaseTransportMiddleware): def __init__(self, tracer: trace.Tracer, service_name: str): @@ -14,29 +14,23 @@ def __init__(self, tracer: trace.Tracer, service_name: str): self.tracer = tracer self.service_name = service_name - - def send(self, call_next: Callable, destination, message, **kwargs): - """ - Middleware for tracing the `send` operation - - :param call_next: The next middleware or the original `send` method. - :param destination: The destination service to which the message is being sent. - :param message: The message being sent. - :param kwargs: Additional arguments for the `send` method. - """ - - # Start a new span for the `send` operation - with self.tracer.start_as_current_span("transport.send") as span: - # Attributes we're interested in - span.set_attribute("service_name", self.service_name) - span.set_attribute("destination", destination) - span.set_attribute("message", str(message)) + def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: + @functools.wraps(callback) + def wrapped_callback(header, message): + # Extract trace context from message headers + ctx = extract(header) if header else None - # Inject trace context into message headers - headers = kwargs.setdefault("headers", {}) - inject(headers) - kwargs["headers"] = headers - - # Call the next middleware or the original `send` method - return call_next(destination, message, **kwargs) - + # Start a new span with the extracted context + with self.tracer.start_as_current_span( + "transport.subscribe", + context=ctx + ) as span: + span.set_attribute("service_name", self.service_name) + span.set_attribute("channel", channel) + + + # Call the original callback + return callback(header, message) + + # Call the next middleware with the wrapped callback + return call_next(channel, wrapped_callback, **kwargs) \ No newline at end of file From cc9ee124f06ae3c98f556f9aafbbc35c81430f7c Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 13:10:45 +0000 Subject: [PATCH 03/16] Add spanid and traceid metadata to greylog --- src/workflows/recipe/__init__.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 5834bfe..abd5854 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -102,6 +102,20 @@ def unwrap_recipe(header, message): if dcid: span.set_attribute("dcid", dcid) span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid}) + + # Extract span_id and trace_id for logging + span_context = span.get_span_context() + if span_context.is_valid: + span_id = format(span_context.span_id, '016x') + trace_id = format(span_context.trace_id, '032x') + + logger.info( + "Processing recipe message", + extra={ + "span_id": span_id, + "trace_id": trace_id, + } + ) if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): From f7cc6589b8af00e60544fae88e8919742b4e8499 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 13:29:04 +0000 Subject: [PATCH 04/16] Add recipe_id to spans --- src/workflows/recipe/__init__.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index abd5854..653ab61 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -70,13 +70,19 @@ def unwrap_recipe(header, message): message = mangle_for_receiving(message) if header.get("workflows-recipe") in {True, "True", "true", 1}: rw = RecipeWrapper(message=message, transport=transport_layer) - print(rw) - logger.log(1, rw) + logger.debug("RecipeWrapper created: %s", rw) - # Extract and set DCID on the current span + # Extract and set DCID and recipe_id on the current span span = trace.get_current_span() dcid = None + recipe_id = None + # Extract recipe ID from environment + if isinstance(message, dict): + environment = message.get("environment", {}) + if isinstance(environment, dict): + recipe_id = environment.get("ID") + # Try multiple locations where DCID might be stored top_level_params = {} if isinstance(message, dict): @@ -103,18 +109,28 @@ def unwrap_recipe(header, message): span.set_attribute("dcid", dcid) span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid}) + if recipe_id: + span.set_attribute("recipe_id", recipe_id) + span.add_event("recipe.id_extracted", attributes={"recipe_id": recipe_id}) + # Extract span_id and trace_id for logging span_context = span.get_span_context() - if span_context.is_valid: + if span_context and span_context.is_valid: span_id = format(span_context.span_id, '016x') trace_id = format(span_context.trace_id, '032x') + log_extra = { + "span_id": span_id, + "trace_id": trace_id, + } + if dcid: + log_extra["dcid"] = dcid + if recipe_id: + log_extra["recipe_id"] = recipe_id + logger.info( "Processing recipe message", - extra={ - "span_id": span_id, - "trace_id": trace_id, - } + extra=log_extra ) if log_extender and rw.environment and rw.environment.get("ID"): From 8b2a2f1fa9d1a927a7b05d8ac986e1c71cba80a4 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 13:45:17 +0000 Subject: [PATCH 05/16] Add dev and prod dependencies --- pyproject.toml | 2 +- requirements_dev.txt | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8d8313a..ea974c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ ] license = { text = "BSD-3-Clause" } requires-python = ">=3.10" -dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7"] +dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] [project.urls] Download = "https://github.com/DiamondLightSource/python-workflows/releases" diff --git a/requirements_dev.txt b/requirements_dev.txt index 8207c45..78c1d02 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -7,3 +7,6 @@ pytest-mock==3.14.0 pytest-timeout==2.3.1 stomp-py==8.1.2 websocket-client==1.8.0 +opentelemetry-api==1.20.0 +opentelemetry-sdk==1.20.0 +opentelemetry-exporter-otlp-proto-http==1.20.0 \ No newline at end of file From 0686e2824e75565eeaaca782b3dbfdaad38246ac Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 15:55:01 +0000 Subject: [PATCH 06/16] Remove dcid extract from message and inject to span logic. Will be added to python-zocalo --- src/workflows/recipe/__init__.py | 29 ++--------------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 653ab61..629cd08 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -72,7 +72,7 @@ def unwrap_recipe(header, message): rw = RecipeWrapper(message=message, transport=transport_layer) logger.debug("RecipeWrapper created: %s", rw) - # Extract and set DCID and recipe_id on the current span + # Extract recipe_id on the current span span = trace.get_current_span() dcid = None recipe_id = None @@ -83,32 +83,6 @@ def unwrap_recipe(header, message): if isinstance(environment, dict): recipe_id = environment.get("ID") - # Try multiple locations where DCID might be stored - top_level_params = {} - if isinstance(message, dict): - # Direct parameters (top-level or in recipe) - top_level_params = message.get("parameters", {}) - - # Payload parameters (most common location) - payload = message.get("payload", {}) - payload_params = {} - if isinstance(payload, dict): - payload_params = payload.get("parameters", {}) - - # Try all common locations - dcid = ( - top_level_params.get("ispyb_dcid") or - top_level_params.get("dcid") or - payload_params.get("ispyb_dcid") or - payload_params.get("dcid") or - payload.get("ispyb_dcid") or - payload.get("dcid") - ) - - if dcid: - span.set_attribute("dcid", dcid) - span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid}) - if recipe_id: span.set_attribute("recipe_id", recipe_id) span.add_event("recipe.id_extracted", attributes={"recipe_id": recipe_id}) @@ -129,6 +103,7 @@ def unwrap_recipe(header, message): log_extra["recipe_id"] = recipe_id logger.info( + "Processing recipe message", extra=log_extra ) From 2d9e21c21080af1fbab429c3e5618071ffa02c25 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 16:24:53 +0000 Subject: [PATCH 07/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/recipe/__init__.py | 19 ++++++----- src/workflows/services/common_service.py | 32 +++++++++++-------- .../transport/middleware/otel_tracing.py | 23 +++++++------ 3 files changed, 41 insertions(+), 33 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 629cd08..ab366f1 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -3,9 +3,10 @@ import functools import logging from collections.abc import Callable -from opentelemetry import trace from typing import Any +from opentelemetry import trace + from workflows.recipe.recipe import Recipe from workflows.recipe.validate import validate_recipe from workflows.recipe.wrapper import RecipeWrapper @@ -82,16 +83,18 @@ def unwrap_recipe(header, message): environment = message.get("environment", {}) if isinstance(environment, dict): recipe_id = environment.get("ID") - + if recipe_id: span.set_attribute("recipe_id", recipe_id) - span.add_event("recipe.id_extracted", attributes={"recipe_id": recipe_id}) + span.add_event( + "recipe.id_extracted", attributes={"recipe_id": recipe_id} + ) # Extract span_id and trace_id for logging span_context = span.get_span_context() if span_context and span_context.is_valid: - span_id = format(span_context.span_id, '016x') - trace_id = format(span_context.trace_id, '032x') + span_id = format(span_context.span_id, "016x") + trace_id = format(span_context.trace_id, "032x") log_extra = { "span_id": span_id, @@ -102,12 +105,8 @@ def unwrap_recipe(header, message): if recipe_id: log_extra["recipe_id"] = recipe_id - logger.info( + logger.info("Processing recipe message", extra=log_extra) - "Processing recipe message", - extra=log_extra - ) - if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): return callback(rw, header, message.get("payload")) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index d79f7fb..5aa8ee6 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -9,15 +9,15 @@ import time from typing import Any -import workflows -import workflows.logging - from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +import workflows +import workflows.logging from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware -from opentelemetry.sdk.resources import Resource, SERVICE_NAME class Status(enum.Enum): @@ -192,11 +192,13 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) - + # Configure OTELTracing - resource = Resource.create({ - SERVICE_NAME: self._service_name, - }) + resource = Resource.create( + { + SERVICE_NAME: self._service_name, + } + ) self.log.debug("Configuring OTELTracing") provider = TracerProvider(resource=resource) @@ -204,18 +206,22 @@ def start_transport(self): # Configure BatchProcessor and OTLPSpanExporter to point to OTELCollector otlp_exporter = OTLPSpanExporter( - endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", - timeout=10 + endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", timeout=10 ) span_processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(span_processor) # Add OTELTracingMiddleware to the transport layer tracer = trace.get_tracer(__name__) - otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name) + otel_middleware = OTELTracingMiddleware( + tracer, service_name=self._service_name + ) self._transport.add_middleware(otel_middleware) - self.log.debug("OTELTracingMiddleware added to transport layer of %s", self._service_name) + self.log.debug( + "OTELTracingMiddleware added to transport layer of %s", + self._service_name, + ) metrics = self._environment.get("metrics") if metrics: diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index 27e89db..453ff6c 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -1,8 +1,13 @@ +from __future__ import annotations + +import functools +from collections.abc import Callable + from opentelemetry import trace +from opentelemetry.propagate import extract + from workflows.transport.middleware import BaseTransportMiddleware -from collections.abc import Callable -import functools -from opentelemetry.propagate import inject, extract + class OTELTracingMiddleware(BaseTransportMiddleware): def __init__(self, tracer: trace.Tracer, service_name: str): @@ -19,18 +24,16 @@ def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: def wrapped_callback(header, message): # Extract trace context from message headers ctx = extract(header) if header else None - + # Start a new span with the extracted context with self.tracer.start_as_current_span( - "transport.subscribe", - context=ctx + "transport.subscribe", context=ctx ) as span: span.set_attribute("service_name", self.service_name) span.set_attribute("channel", channel) - - + # Call the original callback return callback(header, message) - + # Call the next middleware with the wrapped callback - return call_next(channel, wrapped_callback, **kwargs) \ No newline at end of file + return call_next(channel, wrapped_callback, **kwargs) From 3a5283ae292a83d168aabf10a70a5dac47a45468 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 16:52:27 +0000 Subject: [PATCH 08/16] Use plugin configurations to configure connection to OTELCollector --- src/workflows/services/common_service.py | 32 ++++++++++++---------- src/workflows/util/zocalo/configuration.py | 23 ++++++++++++++++ 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index d79f7fb..fc640dd 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -193,20 +193,24 @@ def start_transport(self): self._transport_interceptor ) - # Configure OTELTracing - resource = Resource.create({ - SERVICE_NAME: self._service_name, - }) - - self.log.debug("Configuring OTELTracing") - provider = TracerProvider(resource=resource) - trace.set_tracer_provider(provider) - - # Configure BatchProcessor and OTLPSpanExporter to point to OTELCollector - otlp_exporter = OTLPSpanExporter( - endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", - timeout=10 - ) + # Configure OTELTracing if configuration is available + otel_config = OTEL.config if hasattr(OTEL, 'config') and OTEL.config else None + + if otel_config: + # Configure OTELTracing + resource = Resource.create({ + SERVICE_NAME: self._service_name, + }) + + self.log.debug("Configuring OTELTracing") + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) + + # Configure BatchProcessor and OTLPSpanExporter using config values + otlp_exporter = OTLPSpanExporter( + endpoint=otel_config["endpoint"], + timeout=otel_config.get("timeout", 10) + ) span_processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(span_processor) diff --git a/src/workflows/util/zocalo/configuration.py b/src/workflows/util/zocalo/configuration.py index 08a600a..79ff0fe 100644 --- a/src/workflows/util/zocalo/configuration.py +++ b/src/workflows/util/zocalo/configuration.py @@ -7,6 +7,29 @@ from workflows.transport.pika_transport import PikaTransport from workflows.transport.stomp_transport import StompTransport +class OTEL: + """A Zocalo configuration plugin to pre-populate OTELTracing config defaults""" + + class Schema(PluginSchema): + host = fields.Str(required=True) + port = fields.Int(required=True) + endpoint = fields.Str(required=False) + timeout = fields.Int(required=False, load_default=10) + + # Store configuration for access by services + config = {} + + @staticmethod + def activate(configuration): + # Build the full endpoint URL if not provided + if "endpoint" not in configuration: + endpoint = f"https://{configuration['host']}:{configuration['port']}/v1/traces" + else: + endpoint = configuration["endpoint"] + + OTEL.config["endpoint"] = endpoint + OTEL.config["timeout"] = configuration.get("timeout", 10) + class Stomp: """A Zocalo configuration plugin to pre-populate StompTransport config defaults""" From 4b999f15c3bbe5d5755348961bca2f19f17b2ea6 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 16:58:49 +0000 Subject: [PATCH 09/16] Remove vestigial dcid handling and unnecessary debug statements --- src/workflows/recipe/__init__.py | 11 +---------- src/workflows/services/common_service.py | 1 - 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 629cd08..b7449bb 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -70,11 +70,9 @@ def unwrap_recipe(header, message): message = mangle_for_receiving(message) if header.get("workflows-recipe") in {True, "True", "true", 1}: rw = RecipeWrapper(message=message, transport=transport_layer) - logger.debug("RecipeWrapper created: %s", rw) # Extract recipe_id on the current span span = trace.get_current_span() - dcid = None recipe_id = None # Extract recipe ID from environment @@ -97,16 +95,9 @@ def unwrap_recipe(header, message): "span_id": span_id, "trace_id": trace_id, } - if dcid: - log_extra["dcid"] = dcid + if recipe_id: log_extra["recipe_id"] = recipe_id - - logger.info( - - "Processing recipe message", - extra=log_extra - ) if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index fc640dd..bc48331 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -219,7 +219,6 @@ def start_transport(self): otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name) self._transport.add_middleware(otel_middleware) - self.log.debug("OTELTracingMiddleware added to transport layer of %s", self._service_name) metrics = self._environment.get("metrics") if metrics: From 4b86715ec6cec2f5ca80c5b5ceb9f1d70025eabc Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 16:59:43 +0000 Subject: [PATCH 10/16] remove unhelpful docstring --- src/workflows/transport/middleware/otel_tracing.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index 27e89db..ff3e849 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -6,11 +6,6 @@ class OTELTracingMiddleware(BaseTransportMiddleware): def __init__(self, tracer: trace.Tracer, service_name: str): - """ - Initialize the OpenTelemetry Tracing Middleware. - - :param tracer: An OpenTelemetry tracer instance used to create spans. - """ self.tracer = tracer self.service_name = service_name From 3e0b9029db0243b425f9380203fad819746885a6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 17:08:34 +0000 Subject: [PATCH 11/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/recipe/__init__.py | 2 +- src/workflows/services/common_service.py | 19 +++++++++++-------- src/workflows/util/zocalo/configuration.py | 7 +++++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 59afa5e..143d570 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -101,7 +101,7 @@ def unwrap_recipe(header, message): if recipe_id: log_extra["recipe_id"] = recipe_id - + if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): return callback(rw, header, message.get("payload")) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index 5258a30..ddf39e8 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -192,15 +192,19 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) - + # Configure OTELTracing if configuration is available - otel_config = OTEL.config if hasattr(OTEL, 'config') and OTEL.config else None - + otel_config = ( + OTEL.config if hasattr(OTEL, "config") and OTEL.config else None + ) + if otel_config: # Configure OTELTracing - resource = Resource.create({ - SERVICE_NAME: self._service_name, - }) + resource = Resource.create( + { + SERVICE_NAME: self._service_name, + } + ) self.log.debug("Configuring OTELTracing") provider = TracerProvider(resource=resource) @@ -209,7 +213,7 @@ def start_transport(self): # Configure BatchProcessor and OTLPSpanExporter using config values otlp_exporter = OTLPSpanExporter( endpoint=otel_config["endpoint"], - timeout=otel_config.get("timeout", 10) + timeout=otel_config.get("timeout", 10), ) span_processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(span_processor) @@ -221,7 +225,6 @@ def start_transport(self): ) self._transport.add_middleware(otel_middleware) - metrics = self._environment.get("metrics") if metrics: import prometheus_client diff --git a/src/workflows/util/zocalo/configuration.py b/src/workflows/util/zocalo/configuration.py index 79ff0fe..ca5d77f 100644 --- a/src/workflows/util/zocalo/configuration.py +++ b/src/workflows/util/zocalo/configuration.py @@ -7,6 +7,7 @@ from workflows.transport.pika_transport import PikaTransport from workflows.transport.stomp_transport import StompTransport + class OTEL: """A Zocalo configuration plugin to pre-populate OTELTracing config defaults""" @@ -23,10 +24,12 @@ class Schema(PluginSchema): def activate(configuration): # Build the full endpoint URL if not provided if "endpoint" not in configuration: - endpoint = f"https://{configuration['host']}:{configuration['port']}/v1/traces" + endpoint = ( + f"https://{configuration['host']}:{configuration['port']}/v1/traces" + ) else: endpoint = configuration["endpoint"] - + OTEL.config["endpoint"] = endpoint OTEL.config["timeout"] = configuration.get("timeout", 10) From d446e8099a4fb7e1a61a94207e56dfd20c7e5d84 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 17:16:49 +0000 Subject: [PATCH 12/16] imported OTEL config class to common_service --- src/workflows/services/common_service.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index ddf39e8..62dc653 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -18,6 +18,8 @@ import workflows import workflows.logging from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware +from workflows.util.zocalo.configuration import OTEL + class Status(enum.Enum): From 16b0e10143a11f52c7eeec36b84866da52556082 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 17:17:01 +0000 Subject: [PATCH 13/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/services/common_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index 62dc653..717b448 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -21,7 +21,6 @@ from workflows.util.zocalo.configuration import OTEL - class Status(enum.Enum): """ Internal service status codes From 7ad857fb3a9c457cd0c09913394d714ab92ea3aa Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 27 Jan 2026 14:28:21 +0000 Subject: [PATCH 14/16] add marshmallow dependency --- pyproject.toml | 2 +- requirements_dev.txt | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ea974c1..9ed046d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ ] license = { text = "BSD-3-Clause" } requires-python = ">=3.10" -dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] +dependencies = ["marshmallow","bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] [project.urls] Download = "https://github.com/DiamondLightSource/python-workflows/releases" diff --git a/requirements_dev.txt b/requirements_dev.txt index 78c1d02..bd711be 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -9,4 +9,5 @@ stomp-py==8.1.2 websocket-client==1.8.0 opentelemetry-api==1.20.0 opentelemetry-sdk==1.20.0 -opentelemetry-exporter-otlp-proto-http==1.20.0 \ No newline at end of file +opentelemetry-exporter-otlp-proto-http==1.20.0 +marshmallow \ No newline at end of file From 7aae664ce6f42f75f71b4ba79550938abfa48129 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 27 Jan 2026 14:45:17 +0000 Subject: [PATCH 15/16] add zocalo dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9ed046d..9848750 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ ] license = { text = "BSD-3-Clause" } requires-python = ">=3.10" -dependencies = ["marshmallow","bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] +dependencies = ["zocalo","marshmallow","bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ] [project.urls] Download = "https://github.com/DiamondLightSource/python-workflows/releases" From 902a7dfdcb9f80af32b3c3d091a71fd9258dfcef Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 27 Jan 2026 15:10:15 +0000 Subject: [PATCH 16/16] Fix possibly unbound error --- src/workflows/services/common_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index 717b448..c53b02a 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -216,8 +216,8 @@ def start_transport(self): endpoint=otel_config["endpoint"], timeout=otel_config.get("timeout", 10), ) - span_processor = BatchSpanProcessor(otlp_exporter) - provider.add_span_processor(span_processor) + span_processor = BatchSpanProcessor(otlp_exporter) + provider.add_span_processor(span_processor) # Add OTELTracingMiddleware to the transport layer tracer = trace.get_tracer(__name__)