From d0c2bdb3f63db7ed6cd4d0a02a9cfdfd09e08a7a Mon Sep 17 00:00:00 2001 From: thomas <18520168+yaythomas@users.noreply.github.com> Date: Mon, 29 Jun 2026 11:53:40 -0700 Subject: [PATCH] fix: don't surface suspend as a FAILED outcome A user function that suspends (for example a child context that waits) was reported to instrumentation plugins as a FAILED outcome, so the OTEL plugin recorded the suspend as a span error and X-Ray surfaced a fault on the child-context span. The execution itself behaved correctly; only the instrumentation labeling was wrong. Drop the plugin notification on suspend so that on_user_function_end no longer fires for the suspending invocation. The OTEL plugin's existing on_invocation_end sweep already ends all remaining operation spans cleanly (no status, no fault). Also drop durable.attempt.number and durable.attempt.outcome from CONTEXT spans. These per-attempt fields are meaningful for STEP (each retry is an attempt) but not for CONTEXT, and dropping them aligns the emitted attributes across SDKs. STEP spans are unchanged. No public plugin API change: UserFunctionOutcome retains the SUCCEEDED and FAILED values plugins already used. Closes #491 --- .../plugin.py | 17 +++---- .../tests/test_plugin.py | 45 +++++++++++++++++++ .../plugin.py | 13 +++--- .../aws_durable_execution_sdk_python/state.py | 8 +--- .../tests/plugin_test.py | 24 ++++++++++ .../tests/state_test.py | 42 +++++++++++++++++ 6 files changed, 126 insertions(+), 23 deletions(-) diff --git a/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py b/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py index 01ac034..e9ae356 100644 --- a/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py +++ b/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py @@ -453,11 +453,8 @@ def on_user_function_end(self, info: UserFunctionEndInfo) -> None: else "Unknown error" ) ) - elif info.outcome is UserFunctionOutcome.SUCCEEDED: - span.set_status(StatusCode.OK) else: - # PENDING - span.set_status(StatusCode.UNSET, "PENDING") + span.set_status(StatusCode.OK) end_timestamp = info.end_time if end_timestamp is not None and end_timestamp == info.start_time: @@ -493,9 +490,13 @@ def _extract_attributes(self, info: Any) -> dict[str, str]: attributes["durable.operation.type"] = info.operation_type.value if hasattr(info, "name") and info.name is not None: attributes["durable.operation.name"] = info.name - if hasattr(info, "attempt") and info.attempt is not None: - attributes["durable.attempt.number"] = info.attempt - if hasattr(info, "outcome") and info.outcome is not None: - attributes["durable.attempt.outcome"] = info.outcome.value + # Per-attempt fields are meaningful for STEP (each attempt is retried) + # but not for CONTEXT (a context is entered once per invocation, not + # retried). Omit them on CONTEXT spans for cross-SDK consistency. + if getattr(info, "operation_type", None) is not OperationType.CONTEXT: + if hasattr(info, "attempt") and info.attempt is not None: + attributes["durable.attempt.number"] = info.attempt + if hasattr(info, "outcome") and info.outcome is not None: + attributes["durable.attempt.outcome"] = info.outcome.value return attributes diff --git a/packages/aws-durable-execution-sdk-python-otel/tests/test_plugin.py b/packages/aws-durable-execution-sdk-python-otel/tests/test_plugin.py index 7398aa9..771547f 100644 --- a/packages/aws-durable-execution-sdk-python-otel/tests/test_plugin.py +++ b/packages/aws-durable-execution-sdk-python-otel/tests/test_plugin.py @@ -269,6 +269,51 @@ def test_user_function_callbacks_emit_attempt_span_attributes(): ) +def test_context_span_omits_attempt_attributes(): + """CONTEXT operations do not carry per-attempt attributes. + + durable.attempt.number and durable.attempt.outcome are meaningful for + STEP operations (each retry is an attempt) but not for CONTEXT, so the + plugin omits them on CONTEXT spans for cross-SDK consistency. + """ + plugin, exporter = _create_plugin() + plugin.on_invocation_start(_invocation_start_info()) + operation_id = "ctx-1" + + plugin.on_user_function_start( + UserFunctionStartInfo( + operation_id=operation_id, + operation_type=OperationType.CONTEXT, + sub_type=None, + name="book-trip", + parent_id=None, + start_time=START_TIME, + is_replay_children=False, + attempt=1, + ) + ) + plugin.on_user_function_end( + UserFunctionEndInfo( + operation_id=operation_id, + operation_type=OperationType.CONTEXT, + sub_type=None, + name="book-trip", + parent_id=None, + start_time=START_TIME, + is_replay_children=False, + attempt=1, + outcome=UserFunctionOutcome.SUCCEEDED, + end_time=END_TIME, + error=None, + ) + ) + + span = exporter.get_finished_spans()[0] + assert span.attributes["durable.operation.type"] == OperationType.CONTEXT.value + assert "durable.attempt.number" not in span.attributes + assert "durable.attempt.outcome" not in span.attributes + + def test_span_registry_helpers_can_be_called_from_multiple_threads(): """Verify active span registry helpers are safe under concurrent access.""" plugin, _ = _create_plugin() diff --git a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/plugin.py b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/plugin.py index dd09bc1..34b7529 100644 --- a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/plugin.py +++ b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/plugin.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import contextlib import datetime import functools @@ -7,7 +9,6 @@ from enum import Enum from typing import Any, Callable, MutableMapping -from aws_durable_execution_sdk_python.exceptions import SuspendExecution from aws_durable_execution_sdk_python.identifier import OperationIdentifier from aws_durable_execution_sdk_python.lambda_service import ( DurableExecutionInvocationOutput, @@ -51,16 +52,12 @@ class OperationEndInfo(OperationInfo): class UserFunctionOutcome(Enum): SUCCEEDED = "SUCCEEDED" FAILED = "FAILED" - PENDING = "PENDING" @classmethod - def from_error(cls, error: ErrorObject | None) -> "UserFunctionOutcome": + def from_error(cls, error: ErrorObject | None) -> UserFunctionOutcome: if error is None: return cls(cls.SUCCEEDED) - elif error.type == SuspendExecution.__name__: - return cls(cls.PENDING) - else: - return cls(cls.FAILED) + return cls(cls.FAILED) @dataclass(frozen=True) @@ -86,7 +83,7 @@ class UserFunctionEndInfo(OperationInfo): @classmethod def from_start_info( cls, start_info: UserFunctionStartInfo, error: ErrorObject | None - ) -> "UserFunctionEndInfo": + ) -> UserFunctionEndInfo: return UserFunctionEndInfo( operation_id=start_info.operation_id, operation_type=start_info.operation_type, diff --git a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/state.py b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/state.py index c24bd96..9996591 100644 --- a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/state.py +++ b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/state.py @@ -952,13 +952,7 @@ def wrapper(*args, **kwargs): result = user_function(*args, **kwargs) self._plugin_executor.on_user_function_end(start_info, None) return result - except SuspendExecution as e: - self._plugin_executor.on_user_function_end( - start_info, - ErrorObject( - type=type(e).__name__, message=None, data=None, stack_trace=None - ), - ) + except SuspendExecution: raise except Exception as e: self._plugin_executor.on_user_function_end( diff --git a/packages/aws-durable-execution-sdk-python/tests/plugin_test.py b/packages/aws-durable-execution-sdk-python/tests/plugin_test.py index c402d35..86379c0 100644 --- a/packages/aws-durable-execution-sdk-python/tests/plugin_test.py +++ b/packages/aws-durable-execution-sdk-python/tests/plugin_test.py @@ -783,5 +783,29 @@ def on_operation_attempt_end(self, info): # endregion Helper Classes +# region Suspend Outcome Tests +class TestUserFunctionOutcomeValues(unittest.TestCase): + def test_outcome_values(self): + self.assertEqual( + {o.value for o in UserFunctionOutcome}, + {"SUCCEEDED", "FAILED"}, + ) + + +class TestUserFunctionOutcomeFromError(unittest.TestCase): + def test_none_error_is_succeeded(self): + self.assertEqual( + UserFunctionOutcome.from_error(None), UserFunctionOutcome.SUCCEEDED + ) + + def test_error_is_failed(self): + self.assertEqual( + UserFunctionOutcome.from_error(ERROR), UserFunctionOutcome.FAILED + ) + + +# endregion Suspend Outcome Tests + + if __name__ == "__main__": unittest.main() diff --git a/packages/aws-durable-execution-sdk-python/tests/state_test.py b/packages/aws-durable-execution-sdk-python/tests/state_test.py index f026d35..9362f7c 100644 --- a/packages/aws-durable-execution-sdk-python/tests/state_test.py +++ b/packages/aws-durable-execution-sdk-python/tests/state_test.py @@ -19,6 +19,7 @@ DurableApiErrorCategory, GetExecutionStateError, OrphanedChildException, + TimedSuspendExecution, ) from aws_durable_execution_sdk_python.identifier import OperationIdentifier from aws_durable_execution_sdk_python.lambda_service import ( @@ -41,6 +42,7 @@ from aws_durable_execution_sdk_python.plugin import ( DurableInstrumentationPlugin, PluginExecutor, + UserFunctionEndInfo, ) from aws_durable_execution_sdk_python.state import ( CheckpointBatcherConfig, @@ -4199,6 +4201,46 @@ def on_operation_end(self, info): executor.shutdown(wait=True) +def test_wrap_user_function_suspend_does_not_fire_end_hook(): + """A user function that suspends does not fire the end hook. + + Regression: a timed suspend (TimedSuspendExecution) raised inside a wrapped + user function (e.g. a child context that waits) must not be surfaced to + plugins as a FAILED outcome. The suspend is normal durable control flow, + and the plugin observes it by absence (no end hook fires), with the + instrumentation plugin's own per-invocation span sweep closing any open + spans cleanly at invocation end. + """ + captured: list[UserFunctionEndInfo] = [] + + class _CapturingPlugin(DurableInstrumentationPlugin): + def on_user_function_end(self, info: UserFunctionEndInfo) -> None: + captured.append(info) + + plugin_executor = PluginExecutor(plugins=[_CapturingPlugin()]) + with plugin_executor.run(): + state = ExecutionState( + durable_execution_arn="test_arn", + initial_checkpoint_token="token123", # noqa: S106 + operations={}, + service_client=create_autospec(spec=LambdaClient), + plugin_executor=plugin_executor, + ) + + def suspends(_: object) -> None: + raise TimedSuspendExecution.from_delay("waiting", 5) + + op_id = OperationIdentifier( + operation_id="op-1", sub_type=OperationSubType.STEP, name="step" + ) + wrapped = state.wrap_user_function(suspends, op_id, attempt=1) + + with pytest.raises(TimedSuspendExecution): + wrapped(None) + + assert captured == [] + + def test_plugin_executor_not_called_for_pending_operations(): """Test that plugin_executor.on_operation_update fires on_user_function_end for PENDING operations.""" mock_client = create_autospec(spec=LambdaClient)