Skip to content

Otel: emit plugin operation hooks during replay#496

Merged
zhongkechen merged 2 commits into
mainfrom
replay-plugin-operation-hooks
Jun 30, 2026
Merged

Otel: emit plugin operation hooks during replay#496
zhongkechen merged 2 commits into
mainfrom
replay-plugin-operation-hooks

Conversation

@zhongkechen

Copy link
Copy Markdown
Contributor

Summary

This PR makes plugin operation lifecycle hooks observable during replay, not just when an operation update is checkpointed for the first time.

Previously, on_operation_start and on_operation_end were only dispatched from the checkpoint processing path:

  • on_operation_start fired when a START operation update was successfully checkpointed.
  • on_operation_end fired when a terminal operation state came back from checkpointing.
  • When a later Lambda invocation replayed already-checkpointed operations, the SDK read those operations from ExecutionState.get_checkpoint_result() and skipped execution, but no operation lifecycle plugin hooks were emitted.

That meant plugins could observe the original operation lifecycle but could not observe that the same logical operation was replayed.

What Changed

  • Added is_replayed: bool to OperationInfo.
  • Set is_replayed=False for normal, first-time operation hook dispatch.
  • Added replay hook dispatch from ExecutionState.get_checkpoint_result() while the state is in replay mode.
  • Added PluginExecutor.on_operation_replay() to emit:
    • OperationStartInfo(..., is_replayed=True) for replayed checkpointed operations.
    • OperationEndInfo(..., is_replayed=True) when the replayed operation is terminal.
  • Suppressed duplicate replay lifecycle hooks per operation within an invocation using _replayed_operation_hooks.
  • Skipped replay lifecycle hooks for EXECUTION operations and READY operations.

Behavior Notes

For a replayed terminal operation, plugins now receive a pair of lifecycle callbacks:

  1. on_operation_start(info) with info.is_replayed is True
  2. on_operation_end(info) with info.is_replayed is True

For a replayed non-terminal operation such as STARTED, plugins receive only on_operation_start(info) with info.is_replayed is True.

Fresh checkpointed operations continue to emit the same lifecycle hooks as before, with is_replayed=False.

Test Fix Included

While running the full local unit suite, test_event_timeout_handling failed because its upper timing bound was too tight under local scheduler load. This PR makes that test less flaky by:

  • using time.monotonic() for elapsed time measurement,
  • relaxing the upper bound from 0.1s to 0.25s,
  • ensuring scheduler.stop() runs in a finally block even if the assertion fails.

Validation

Local checks run:

  • hatch fmt --check
  • hatch run types:check
  • hatch run test:all -q

The final full unit run passed: 2676 passed, 2 skipped.

@zhongkechen zhongkechen force-pushed the replay-plugin-operation-hooks branch from b44d55a to 30eb842 Compare June 30, 2026 00:30
@zhongkechen zhongkechen changed the title Emit plugin operation hooks during replay Otel: emit plugin operation hooks during replay Jun 30, 2026
def on_operation_replay(self, operation: Operation) -> None:
"""Execute plugins for a checkpointed operation observed during replay."""
if operation.status is OperationStatus.READY:
return

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check could possibly move to _emit_operation_replay_hooks, then it's all in one place?

def _emit_operation_replay_hooks(self, operation: Operation) -> None:
    """Emit operation hooks once for each checkpointed operation during replay."""
    if operation.operation_type is OperationType.EXECUTION:
        return
    if operation.status is OperationStatus.READY:        # moved here, before the set add
        return

    with self._replay_status_lock:
        if self._replay_status is not ReplayStatus.REPLAY:
            return
        if operation.operation_id in self._replayed_operation_hooks:
            return
        self._replayed_operation_hooks.add(operation.operation_id)

    self._plugin_executor.on_operation_replay(operation)

that way it happens together and the (highly theoretical) future possibility where the the op is recorded as handled even though no hook fired won't be there.

yaythomas
yaythomas previously approved these changes Jun 30, 2026

@yaythomas yaythomas left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice work in a tricky area! 🥇

I left a comment but it's just a suggestion, I didn't fully trace it through so I could easily be missing something. Have an approve regardless :shipit:

@yaythomas yaythomas left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

😃

Comment on lines +466 to +468
if operation.operation_id in self._replayed_operation_hooks:
return
self._replayed_operation_hooks.add(operation.operation_id)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused. What is this _replayed_operation_hooks variable used for? Wouldn't this reset on each invocation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_operation_replay method has been called for operations in this dict. It's reset per invocation.


def on_operation_replay(self, operation: Operation) -> None:
"""Execute plugins for a checkpointed operation observed during replay."""
start_info = OperationStartInfo(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, is this being executed on every single replay on every invocation?

@zhongkechen zhongkechen Jun 30, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

called once per operation per invocation if the operation's context is replayed. If the context is completed and the result is cached, its child operations will not be replayed and thus this will not be called.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this affect the Otel Plugin behaviour?

@zhongkechen

Copy link
Copy Markdown
Contributor Author

The tests hang for hours https://github.com/aws/aws-durable-execution-sdk-python/actions/runs/28416323893/job/84199892778?pr=496
I suspect it's a scheduler issue in a test case.

@zhongkechen zhongkechen merged commit 3228220 into main Jun 30, 2026
71 of 73 checks passed
@zhongkechen zhongkechen deleted the replay-plugin-operation-hooks branch June 30, 2026 17:54
@zhongkechen

Copy link
Copy Markdown
Contributor Author

merging now. Will address the test cases and other comments in the next PR

if operation.status is OperationStatus.READY:
return

with self._replay_status_lock:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still using the state level replay status, which will be removed in #488, do we want to keep this ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated to use the new per context replay status

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants