Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
]
license = { text = "BSD-3-Clause" }
requires-python = ">=3.10"
dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7"]
dependencies = ["zocalo","marshmallow","bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ]

[project.urls]
Download = "https://github.com/DiamondLightSource/python-workflows/releases"
Expand Down
4 changes: 4 additions & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ pytest-mock==3.14.0
pytest-timeout==2.3.1
stomp-py==8.1.2
websocket-client==1.8.0
opentelemetry-api==1.20.0
opentelemetry-sdk==1.20.0
opentelemetry-exporter-otlp-proto-http==1.20.0
marshmallow
33 changes: 33 additions & 0 deletions src/workflows/recipe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from collections.abc import Callable
from typing import Any

from opentelemetry import trace

from workflows.recipe.recipe import Recipe
from workflows.recipe.validate import validate_recipe
from workflows.recipe.wrapper import RecipeWrapper
Expand Down Expand Up @@ -69,6 +71,37 @@ def unwrap_recipe(header, message):
message = mangle_for_receiving(message)
if header.get("workflows-recipe") in {True, "True", "true", 1}:
rw = RecipeWrapper(message=message, transport=transport_layer)

# Extract recipe_id on the current span
span = trace.get_current_span()
recipe_id = None

# Extract recipe ID from environment
if isinstance(message, dict):
environment = message.get("environment", {})
if isinstance(environment, dict):
recipe_id = environment.get("ID")

if recipe_id:
span.set_attribute("recipe_id", recipe_id)
span.add_event(
"recipe.id_extracted", attributes={"recipe_id": recipe_id}
)

# Extract span_id and trace_id for logging
span_context = span.get_span_context()
if span_context and span_context.is_valid:
span_id = format(span_context.span_id, "016x")
trace_id = format(span_context.trace_id, "032x")

log_extra = {
"span_id": span_id,
"trace_id": trace_id,
}

if recipe_id:
log_extra["recipe_id"] = recipe_id

if log_extender and rw.environment and rw.environment.get("ID"):
with log_extender("recipe_ID", rw.environment["ID"]):
return callback(rw, header, message.get("payload"))
Expand Down
41 changes: 41 additions & 0 deletions src/workflows/services/common_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@
import time
from typing import Any

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

import workflows
import workflows.logging
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware
from workflows.util.zocalo.configuration import OTEL


class Status(enum.Enum):
Expand Down Expand Up @@ -185,6 +193,39 @@
self.transport.subscription_callback_set_intercept(
self._transport_interceptor
)

# Configure OTELTracing if configuration is available
otel_config = (
OTEL.config if hasattr(OTEL, "config") and OTEL.config else None
)

if otel_config:
# Configure OTELTracing
resource = Resource.create(
{
SERVICE_NAME: self._service_name,
}
)

self.log.debug("Configuring OTELTracing")
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)

# Configure BatchProcessor and OTLPSpanExporter using config values
otlp_exporter = OTLPSpanExporter(
endpoint=otel_config["endpoint"],
timeout=otel_config.get("timeout", 10),
)
span_processor = BatchSpanProcessor(otlp_exporter)

Check failure

Code scanning / CodeQL

Potentially uninitialized local variable Error

Local variable 'otlp_exporter' may be used before it is initialized.
provider.add_span_processor(span_processor)

Check failure

Code scanning / CodeQL

Potentially uninitialized local variable Error

Local variable 'provider' may be used before it is initialized.

# Add OTELTracingMiddleware to the transport layer
tracer = trace.get_tracer(__name__)
otel_middleware = OTELTracingMiddleware(
tracer, service_name=self._service_name
)
self._transport.add_middleware(otel_middleware)

metrics = self._environment.get("metrics")
if metrics:
import prometheus_client
Expand Down
34 changes: 34 additions & 0 deletions src/workflows/transport/middleware/otel_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import functools
from collections.abc import Callable

from opentelemetry import trace
from opentelemetry.propagate import extract

from workflows.transport.middleware import BaseTransportMiddleware


class OTELTracingMiddleware(BaseTransportMiddleware):
def __init__(self, tracer: trace.Tracer, service_name: str):
self.tracer = tracer
self.service_name = service_name

def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int:
@functools.wraps(callback)
def wrapped_callback(header, message):
# Extract trace context from message headers
ctx = extract(header) if header else None

# Start a new span with the extracted context
with self.tracer.start_as_current_span(
"transport.subscribe", context=ctx
) as span:
span.set_attribute("service_name", self.service_name)
span.set_attribute("channel", channel)

# Call the original callback
return callback(header, message)

# Call the next middleware with the wrapped callback
return call_next(channel, wrapped_callback, **kwargs)
26 changes: 26 additions & 0 deletions src/workflows/util/zocalo/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,32 @@
from workflows.transport.stomp_transport import StompTransport


class OTEL:
"""A Zocalo configuration plugin to pre-populate OTELTracing config defaults"""

class Schema(PluginSchema):
host = fields.Str(required=True)
port = fields.Int(required=True)
endpoint = fields.Str(required=False)
timeout = fields.Int(required=False, load_default=10)

# Store configuration for access by services
config = {}

@staticmethod
def activate(configuration):
# Build the full endpoint URL if not provided
if "endpoint" not in configuration:
endpoint = (
f"https://{configuration['host']}:{configuration['port']}/v1/traces"
)
else:
endpoint = configuration["endpoint"]

OTEL.config["endpoint"] = endpoint
OTEL.config["timeout"] = configuration.get("timeout", 10)


class Stomp:
"""A Zocalo configuration plugin to pre-populate StompTransport config defaults"""

Expand Down
Loading