diff --git a/datadog_lambda/patch.py b/datadog_lambda/patch.py index 6d2af0dc..879f2bc3 100644 --- a/datadog_lambda/patch.py +++ b/datadog_lambda/patch.py @@ -31,6 +31,20 @@ def patch_all(): if config.trace_enabled: patch_all_dd() + # AIDEV-NOTE: Until the aws_durable_execution_sdk_python integration + # ships in a stable ddtrace release, this branch wires it up directly. + # ddtrace.patch(aws_durable_execution_sdk_python=True) becomes a no-op + # against PyPI ddtrace because _monkey.py doesn't know the name yet. + # Remove once the integration is GA. + try: + from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import ( + patch as _patch_ade, + ) + + _patch_ade() + logger.debug("aws_durable_execution_sdk_python integration patched") + except Exception as e: + logger.debug("Failed to patch aws_durable_execution_sdk_python: %s", e) else: _patch_http() _ensure_patch_requests() diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 3c7d9f11..871aea69 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -621,6 +621,207 @@ def get_injected_authorizer_data(event, is_http_api) -> dict: logger.debug("Failed to check if invocated by an authorizer. error %s", e) +def is_durable_execution_replay(event): + """ + Check if this Lambda invocation is a durable execution replay. + + A replay occurs when there are existing operations in InitialExecutionState, + meaning this invocation is resuming from a previous checkpoint rather than + starting fresh. + + For replay invocations, we should skip creating inferred spans because: + - The trace context is being continued from the checkpoint + - Creating an inferred span would create a duplicate + + Returns: + True if this is a replay invocation (should skip inferred span) + False if this is first invocation or not a durable execution + """ + if not isinstance(event, dict): + return False + + if "DurableExecutionArn" not in event: + return False + + initial_state = event.get("InitialExecutionState", {}) + operations = initial_state.get("Operations", []) + + # The SDK always includes the EXECUTION operation itself (1 operation on first invocation). + # A replay has >1 operations (the EXECUTION + previously completed operations). + # This aligns with the SDK's ReplayStatus logic in execution.py. + return len(operations) > 1 + + +_TRACE_CHECKPOINT_PREFIX = "_datadog_" + + +def _extract_from_datadog_checkpoint(operations): + """Priority 1: highest-numbered ``_datadog_{N}`` STEP operation. + + Returns a Context, or None if no usable checkpoint is present. + """ + candidates = [] + for operation in operations: + op_name = operation.get("Name") + if not op_name or not op_name.startswith(_TRACE_CHECKPOINT_PREFIX): + continue + suffix = op_name[len(_TRACE_CHECKPOINT_PREFIX) :] + try: + number = int(suffix) + except ValueError: + continue + candidates.append((number, operation)) + + if not candidates: + return None + + candidates.sort(key=lambda t: t[0]) + _, operation = candidates[-1] + + payload_str = (operation.get("StepDetails") or {}).get("Result") + if not payload_str: + return None + + try: + payload = json.loads(payload_str) + except (ValueError, TypeError): + return None + + if not isinstance(payload, dict): + return None + + context = propagator.extract(payload) + if context and context.trace_id: + return context + return None + + +def _extract_from_input_payload(operations): + """Priority 2: Datadog headers in the original event's InputPayload. + + The first operation in ``InitialExecutionState.Operations`` is the EXECUTION + operation; its ``ExecutionDetails.InputPayload`` is the original event the + durable function was invoked with — immutable across replays. If the caller + embedded Datadog headers (typical for API Gateway, direct invoke from + another instrumented service, or a chained durable invoke), use them. + """ + if not operations: + return None + + first_op = operations[0] + payload_str = (first_op.get("ExecutionDetails") or {}).get("InputPayload") + if not payload_str: + return None + + try: + payload = json.loads(payload_str) + except (ValueError, TypeError): + return None + + if not isinstance(payload, dict): + return None + + # Try the wrapping conventions used by other instrumented sources. + for carrier in ( + payload.get("_datadog"), + payload.get("headers"), + payload, + ): + if not isinstance(carrier, dict): + continue + context = propagator.extract(carrier) + if context and context.trace_id: + return context + return None + + +def extract_context_from_durable_execution(event, lambda_context): + """ + Extract Datadog trace context from AWS Lambda Durable Execution event. + + Two-tier priority: + + 1. Highest-numbered ``_datadog_{N}`` STEP checkpoint (set by the + ``aws_durable_execution_sdk_python`` integration on a prior invocation). + 2. Datadog headers found in the original event's ``InputPayload`` — + carries upstream context when an instrumented service invoked us. + + If neither yields a context, returns ``None`` and the caller falls through + to the rest of the extraction chain (Lambda context, X-Ray, etc.). On the + very first invocation of a durable execution with no upstream context, the + tracer will simply mint a fresh trace; subsequent invocations recover that + same trace via the priority-1 checkpoint. + """ + try: + if not isinstance(event, dict): + return None + + if "DurableExecutionArn" not in event or "InitialExecutionState" not in event: + return None + + operations = event.get("InitialExecutionState", {}).get("Operations", []) + + ctx = _extract_from_datadog_checkpoint(operations) + if ctx is not None: + return ctx + + return _extract_from_input_payload(operations) + + except Exception as e: + logger.debug("Failed to extract trace context from durable execution: %s", e) + return None + + +def create_durable_execution_root_span(event): + """ + Create the durable execution root span on the FIRST invocation only. + + - First invocation (no prior operations): creates the root span and returns it. + The span gets a fresh random ``span_id`` from the tracer; dd-trace-py's + checkpoint writer reads that id back via the live span tree (grandparent + walk) and persists it so subsequent invocations parent off the same root. + - Replay invocations: returns ``None`` — trace context is restored from the + ``_datadog_{N}`` checkpoint by ``extract_context_from_durable_execution``. + + Returns the root span (caller must call ``span.finish()`` when the invocation + ends), or ``None`` if not a durable execution or if this is a replay. + """ + try: + if not isinstance(event, dict): + return None + + execution_arn = event.get("DurableExecutionArn") + has_initial_state = "InitialExecutionState" in event + if not execution_arn or not has_initial_state: + return None + + if is_durable_execution_replay(event): + return None + + service_name = ( + os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution" + ) + resource = ( + execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn + ) + + span = tracer.trace( + "aws.durable-execution", + service=service_name, + resource=resource, + span_type="serverless", + ) + if span is None: + return None + + span.set_tag("durable.execution_arn", execution_arn) + return span + + except Exception as e: + logger.debug("Failed to create durable execution root span: %s", e) + return None + + def extract_dd_trace_context( event, lambda_context, extractor=None, decode_authorizer_context: bool = True ): @@ -634,6 +835,18 @@ def extract_dd_trace_context( trace_context_source = None event_source = parse_event_source(event) + # Check for AWS Lambda Durable Execution events first (before other checks) + # This ensures trace context is properly continued across durable invocations + durable_context = extract_context_from_durable_execution(event, lambda_context) + if _is_context_complete(durable_context): + logger.debug("Extracted Datadog trace context from durable execution") + dd_trace_context = durable_context + trace_context_source = TraceContextSource.EVENT + logger.debug( + "extracted dd trace context from durable execution: %s", dd_trace_context + ) + return dd_trace_context, trace_context_source, event_source + if extractor is not None: context = extract_context_custom_extractor(extractor, event, lambda_context) elif isinstance(event, (set, dict)) and "request" in event: @@ -977,12 +1190,16 @@ def process_injected_data(event, request_time_epoch_ms, args, tags): start_time_ns = int( injected_authorizer_data.get(Headers.Parent_Span_Finish_Time) ) - integration_latency = int( - event["requestContext"]["authorizer"].get("integrationLatency", 0) - ) - finish_time_ns = max( - start_time_ns, (request_time_epoch_ms + integration_latency) * 1e6 - ) + finish_time_ns = ( + request_time_epoch_ms + + ( + int( + event["requestContext"]["authorizer"].get( + "integrationLatency", 0 + ) + ) + ) + ) * 1e6 upstream_authorizer_span = insert_upstream_authorizer_span( args, tags, start_time_ns, finish_time_ns ) @@ -1445,9 +1662,9 @@ def create_function_execution_span( trace_context_source, merge_xray_traces, trigger_tags, - durable_function_tags=None, parent_span=None, span_pointers=None, + durable_function_tags=None, ): tags = None if context: @@ -1456,7 +1673,6 @@ def create_function_execution_span( function_arn = ":".join(tk[0:7]) if len(tk) > 7 else function_arn function_version = tk[7] if len(tk) > 7 else "$LATEST" tags = { - "span.kind": "server", "cold_start": str(is_cold_start).lower(), "function_arn": function_arn, "function_version": function_version, diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 767816a5..83df4163 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -39,6 +39,8 @@ create_inferred_span, InferredSpanInfo, is_authorizer_response, + is_durable_execution_replay, + create_durable_execution_root_span, tracer, propagator, ) @@ -149,6 +151,9 @@ def __init__(self, func): self.inferred_span = None self.response = None self.blocking_response = None + self.durable_root_span = None + self.is_durable = False + self.durable_status = None if config.profiling_enabled and profiler: self.prof = profiler.Profiler(env=config.env, service=config.service) @@ -269,7 +274,9 @@ def _before(self, event, context): if config.trace_enabled: set_dd_trace_py_root(trace_context_source, config.merge_xray_traces) - if config.make_inferred_span: + # Skip inferred span for durable execution replays to avoid duplicates + # For replays, trace context comes from checkpoint, not from event trigger + if config.make_inferred_span and not is_durable_execution_replay(event): self.inferred_span = create_inferred_span( event, context, event_source, config.decode_authorizer_context ) @@ -277,6 +284,43 @@ def _before(self, event, context): if config.appsec_enabled: asm_set_context(event_source) + # For durable executions: create root span BEFORE aws.lambda span + # so aws.lambda becomes a child of the root durable execution span. + self.is_durable = ( + isinstance(event, dict) and "DurableExecutionArn" in event + ) + if self.is_durable: + # Set _reactivate on the active context so it persists after + # all spans close. This prevents ddtrace from purging the + # context when the last span (aws.lambda or root) finishes. + # AIDEV-NOTE: _reactivate=True keeps the active Context alive + # after all spans on it close. Without it, ddtrace purges the + # context when the last span (aws.lambda or the durable root) + # finishes, and any later trace-checkpoint write would lose + # the propagation parent. Python-only fix; no JS analog. + active_ctx = tracer.context_provider.active() + if active_ctx and hasattr(active_ctx, "_reactivate"): + active_ctx._reactivate = True + + # For replay: copy _meta from extracted context for _dd.p.* tag propagation + # set_dd_trace_py_root only copies trace_id/span_id/sampling_priority, + # so propagation tags from the checkpoint would be lost without this. + if ( + dd_context + and hasattr(dd_context, "_meta") + and active_ctx + and hasattr(active_ctx, "_meta") + ): + for k, v in dd_context._meta.items(): + if k not in active_ctx._meta: + active_ctx._meta[k] = v + + # Component 1: Create root span (first invocation only) + # Component 4: On replays, returns None (context from checkpoint) + self.durable_root_span = create_durable_execution_root_span(event) + + # Create aws.lambda span — child of root durable span (first invocation) + # or child of checkpoint context (replay), or normal parent otherwise self.span = create_function_execution_span( context=context, function_name=config.function_name, @@ -289,9 +333,11 @@ def _before(self, event, context): parent_span=self.inferred_span, span_pointers=calculate_span_pointers(event_source, event), ) + if config.appsec_enabled: asm_start_request(self.span, event, event_source, self.trigger_tags) self.blocking_response = get_asm_blocked_response(self.event_source) + else: set_correlation_ids() if config.profiling_enabled and profiler and is_new_sandbox(): @@ -318,6 +364,11 @@ def _after(self, event, context): if should_trace_cold_start: trace_ctx = tracer.current_trace_context() + if self.is_durable: + self.durable_status = extract_durable_execution_status( + self.response, event + ) + if self.span: if config.appsec_enabled and not self.blocking_response: asm_start_response( @@ -343,15 +394,22 @@ def _after(self, event, context): if status_code: self.span.set_tag("http.status_code", status_code) - durable_status = extract_durable_execution_status(self.response, event) - if durable_status: + if self.durable_status: self.span.set_tag( "aws_lambda.durable_function.execution_status", - durable_status, + self.durable_status, ) self.span.finish() + # Finish durable execution root span LAST, after aws.lambda. + # The trace-context checkpoint (for cross-invocation continuity) + # is saved by dd-trace-py's aws_durable_execution_sdk_python + # integration when the aws.durable_execution.execute span closes. + if self.durable_root_span: + self.durable_root_span.finish() + self.durable_root_span = None + if status_code: self.trigger_tags["http.status_code"] = status_code mark_trace_as_error_for_5xx_responses(context, status_code, self.span) diff --git a/tests/test_durable.py b/tests/test_durable.py index 36a3e8c5..d06a3cc0 100644 --- a/tests/test_durable.py +++ b/tests/test_durable.py @@ -176,3 +176,146 @@ def test_returns_none_for_none_response(self): "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:function:f:1/durable-execution/n/id" } self.assertIsNone(extract_durable_execution_status(None, event)) + + +import json + +from datadog_lambda.tracing import ( + create_durable_execution_root_span, + extract_context_from_durable_execution, + is_durable_execution_replay, +) + + +_TEST_ARN = "arn:aws:lambda:us-east-2:1:function:f:1" "/durable-execution/wf/abc-123" + + +def _event(operations): + return { + "DurableExecutionArn": _TEST_ARN, + "InitialExecutionState": {"Operations": operations}, + } + + +def _execution_op(input_payload=None): + op = {"OperationType": "EXECUTION", "Name": "execution"} + if input_payload is not None: + op["ExecutionDetails"] = {"InputPayload": input_payload} + return op + + +def _trace_checkpoint_op(n, headers): + return { + "OperationType": "STEP", + "Id": f"id-{n}", + "Name": f"_datadog_{n}", + "StepDetails": {"Result": json.dumps(headers)}, + } + + +class TestExtractContextPriorityOne(unittest.TestCase): + """Highest-numbered ``_datadog_{N}`` STEP wins.""" + + def test_returns_context_from_latest_checkpoint(self): + ev = _event( + [ + _execution_op(), + _trace_checkpoint_op( + 0, + { + "x-datadog-trace-id": "111", + "x-datadog-parent-id": "222", + "x-datadog-sampling-priority": "1", + }, + ), + _trace_checkpoint_op( + 1, + { + "x-datadog-trace-id": "111", + "x-datadog-parent-id": "333", + "x-datadog-sampling-priority": "1", + }, + ), + ] + ) + ctx = extract_context_from_durable_execution(ev, None) + self.assertEqual(ctx.trace_id, 111) + # Latest checkpoint (N=1) wins. + self.assertEqual(ctx.span_id, 333) + + +class TestExtractContextPriorityTwo(unittest.TestCase): + """When no checkpoint exists, fall back to the original event payload.""" + + def test_extracts_from_input_payload_headers_field(self): + upstream_headers = { + "x-datadog-trace-id": "777", + "x-datadog-parent-id": "888", + "x-datadog-sampling-priority": "1", + } + input_payload = json.dumps({"headers": upstream_headers, "body": "..."}) + ev = _event([_execution_op(input_payload)]) + ctx = extract_context_from_durable_execution(ev, None) + self.assertEqual(ctx.trace_id, 777) + self.assertEqual(ctx.span_id, 888) + + def test_extracts_from_input_payload_underscore_datadog_field(self): + upstream_headers = { + "x-datadog-trace-id": "999", + "x-datadog-parent-id": "111", + "x-datadog-sampling-priority": "1", + } + input_payload = json.dumps({"_datadog": upstream_headers}) + ev = _event([_execution_op(input_payload)]) + ctx = extract_context_from_durable_execution(ev, None) + self.assertEqual(ctx.trace_id, 999) + + +class TestExtractContextReturnsNoneWhenNoUpstream(unittest.TestCase): + """No checkpoint and no upstream headers → return None and let the rest + of the extraction chain run. The tracer mints a fresh trace on the first + invocation; subsequent invocations recover it via the priority-1 checkpoint. + """ + + def test_returns_none_when_only_execution_op(self): + ev = _event([_execution_op()]) + self.assertIsNone(extract_context_from_durable_execution(ev, None)) + + def test_returns_none_when_input_payload_has_no_dd_headers(self): + ev = _event([_execution_op(json.dumps({"some": "user-event"}))]) + self.assertIsNone(extract_context_from_durable_execution(ev, None)) + + +class TestIsDurableExecutionReplay(unittest.TestCase): + def test_first_invocation_is_not_replay(self): + self.assertFalse(is_durable_execution_replay(_event([_execution_op()]))) + + def test_second_invocation_is_replay(self): + ev = _event([_execution_op(), _trace_checkpoint_op(0, {})]) + self.assertTrue(is_durable_execution_replay(ev)) + + def test_non_durable_event_is_not_replay(self): + self.assertFalse(is_durable_execution_replay({"body": "..."})) + + +class TestCreateDurableExecutionRootSpan(unittest.TestCase): + def test_returns_none_on_replay(self): + ev = _event([_execution_op(), _trace_checkpoint_op(0, {})]) + self.assertIsNone(create_durable_execution_root_span(ev)) + + def test_returns_none_for_non_durable_event(self): + self.assertIsNone(create_durable_execution_root_span({"body": "..."})) + + def test_first_invocation_returns_a_span(self): + ev = _event([_execution_op()]) + span = create_durable_execution_root_span(ev) + try: + self.assertIsNotNone(span) + # The span_id is whatever the tracer minted; dd-trace-py reads it + # back via a grandparent walk from the in-process span tree, so we + # don't assert any deterministic relationship to the ARN here. + self.assertGreater(span.span_id, 0) + self.assertEqual(span.get_tag("durable.execution_arn"), _TEST_ARN) + finally: + if span is not None: + span.finish()