From 72f80628fb934ff28e69a712b38633f02c07747f Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 30 Jun 2026 13:46:24 -0700 Subject: [PATCH 1/3] fix(testing): scale local durable timers --- .../test/conftest.py | 10 ++++-- .../checkpoint/processors/step.py | 6 ++-- .../checkpoint/processors/wait.py | 7 ++-- .../time_scale.py | 32 +++++++++++++++++++ .../tests/checkpoint/processors/step_test.py | 27 ++++++++++++++++ .../tests/checkpoint/processors/wait_test.py | 18 +++++++++++ 6 files changed, 91 insertions(+), 9 deletions(-) create mode 100644 packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/time_scale.py diff --git a/packages/aws-durable-execution-sdk-python-examples/test/conftest.py b/packages/aws-durable-execution-sdk-python-examples/test/conftest.py index 9f0d593..f3e14df 100644 --- a/packages/aws-durable-execution-sdk-python-examples/test/conftest.py +++ b/packages/aws-durable-execution-sdk-python-examples/test/conftest.py @@ -162,7 +162,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): @pytest.fixture -def durable_runner(request): +def durable_runner(request, monkeypatch): """Pytest fixture that provides a test runner based on configuration. Configuration for cloud mode: @@ -221,8 +221,14 @@ def test_hello_world(durable_runner): else: if not handler: pytest.fail("handler is required for local mode tests") + time_scale = os.environ.get("DURABLE_EXECUTION_TIME_SCALE", "0.05") + monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", time_scale) + try: + poll_interval = min(0.05, max(0.001, float(time_scale))) + except ValueError: + poll_interval = 0.05 # Create local runner (needs cleanup via context manager) - runner = DurableFunctionTestRunner(handler=handler) + runner = DurableFunctionTestRunner(handler=handler, poll_interval=poll_interval) # Wrap in adapter and use context manager for proper cleanup with TestRunnerAdapter(runner, runner_mode) as adapter: diff --git a/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/step.py b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/step.py index 0db5a0b..2e812ff 100644 --- a/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/step.py +++ b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/step.py @@ -19,6 +19,7 @@ from aws_durable_execution_sdk_python_testing.exceptions import ( InvalidParameterValueException, ) +from aws_durable_execution_sdk_python_testing.time_scale import scale_delay if TYPE_CHECKING: @@ -50,7 +51,8 @@ def process( if update.step_options else 0 ) - next_attempt_time = datetime.now(UTC) + timedelta(seconds=delay) + scaled_delay = scale_delay(delay) + next_attempt_time = datetime.now(UTC) + timedelta(seconds=scaled_delay) # Build new step_details with incremented attempt current_attempt = ( @@ -103,7 +105,7 @@ def process( notifier.notify_step_retry_scheduled( execution_arn=execution_arn, operation_id=update.operation_id, - delay=delay, + delay=scaled_delay, ) return retry_operation case OperationAction.SUCCEED: diff --git a/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/wait.py b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/wait.py index 01cc69b..767085c 100644 --- a/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/wait.py +++ b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/wait.py @@ -2,8 +2,6 @@ from __future__ import annotations -import logging -import os from datetime import UTC, datetime, timedelta from typing import TYPE_CHECKING @@ -21,6 +19,7 @@ from aws_durable_execution_sdk_python_testing.exceptions import ( InvalidParameterValueException, ) +from aws_durable_execution_sdk_python_testing.time_scale import scale_delay if TYPE_CHECKING: @@ -43,9 +42,7 @@ def process( wait_seconds = ( update.wait_options.wait_seconds if update.wait_options else 0 ) - time_scale = float(os.getenv("DURABLE_EXECUTION_TIME_SCALE", "1.0")) - logging.info("Using DURABLE_EXECUTION_TIME_SCALE: %f", time_scale) - scaled_wait_seconds = wait_seconds * time_scale + scaled_wait_seconds = scale_delay(wait_seconds) scheduled_end_timestamp = datetime.now(UTC) + timedelta( seconds=scaled_wait_seconds diff --git a/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/time_scale.py b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/time_scale.py new file mode 100644 index 0000000..d8f2d0d --- /dev/null +++ b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/time_scale.py @@ -0,0 +1,32 @@ +"""Helpers for scaling local durable timer delays in tests.""" + +from __future__ import annotations + +import logging +import os + + +logger = logging.getLogger(__name__) + +_TIME_SCALE_ENV = "DURABLE_EXECUTION_TIME_SCALE" + + +def get_time_scale() -> float: + """Return the configured local durable timer scale.""" + raw_scale = os.getenv(_TIME_SCALE_ENV, "1.0") + try: + scale = float(raw_scale) + except ValueError: + logger.warning("Ignoring invalid %s value: %s", _TIME_SCALE_ENV, raw_scale) + return 1.0 + + if scale < 0: + logger.warning("Ignoring negative %s value: %s", _TIME_SCALE_ENV, raw_scale) + return 1.0 + + return scale + + +def scale_delay(delay: float | int) -> float: + """Scale a durable timer delay for local testing.""" + return float(delay) * get_time_scale() diff --git a/packages/aws-durable-execution-sdk-python-testing/tests/checkpoint/processors/step_test.py b/packages/aws-durable-execution-sdk-python-testing/tests/checkpoint/processors/step_test.py index 6ba5cc6..d273f41 100644 --- a/packages/aws-durable-execution-sdk-python-testing/tests/checkpoint/processors/step_test.py +++ b/packages/aws-durable-execution-sdk-python-testing/tests/checkpoint/processors/step_test.py @@ -125,6 +125,33 @@ def test_process_retry_action(): assert notifier.step_retry_calls[0] == (execution_arn, "step-123", 30) +def test_process_retry_action_scales_retry_delay(monkeypatch): + monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.5") + processor = StepProcessor() + notifier = MockNotifier() + execution_arn = "arn:aws:states:us-east-1:123456789012:execution:test" + + current_op = Mock() + current_op.start_timestamp = datetime.now(UTC) + current_op.step_details = StepDetails(attempt=1) + current_op.execution_details = None + current_op.context_details = None + current_op.wait_details = None + current_op.callback_details = None + current_op.chained_invoke_details = None + + update = OperationUpdate( + operation_id="step-123", + operation_type=OperationType.STEP, + action=OperationAction.RETRY, + step_options=StepOptions(next_attempt_delay_seconds=30), + ) + + processor.process(update, current_op, notifier, execution_arn) + + assert notifier.step_retry_calls[0] == (execution_arn, "step-123", 15) + + def test_process_retry_action_without_step_options(): processor = StepProcessor() notifier = MockNotifier() diff --git a/packages/aws-durable-execution-sdk-python-testing/tests/checkpoint/processors/wait_test.py b/packages/aws-durable-execution-sdk-python-testing/tests/checkpoint/processors/wait_test.py index 42c29aa..1fae74b 100644 --- a/packages/aws-durable-execution-sdk-python-testing/tests/checkpoint/processors/wait_test.py +++ b/packages/aws-durable-execution-sdk-python-testing/tests/checkpoint/processors/wait_test.py @@ -73,6 +73,24 @@ def test_process_start_action(): assert notifier.wait_timer_calls[0] == (execution_arn, "wait-123", 30) +def test_process_start_action_scales_wait_delay(monkeypatch): + monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.5") + processor = WaitProcessor() + notifier = MockNotifier() + execution_arn = "arn:aws:states:us-east-1:123456789012:execution:test" + + update = OperationUpdate( + operation_id="wait-123", + operation_type=OperationType.WAIT, + action=OperationAction.START, + wait_options=WaitOptions(wait_seconds=30), + ) + + processor.process(update, None, notifier, execution_arn) + + assert notifier.wait_timer_calls[0] == (execution_arn, "wait-123", 15) + + def test_process_start_action_without_wait_options(): processor = WaitProcessor() notifier = MockNotifier() From d119e0280001182cc8b208b4291e7f942f1e1560 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 30 Jun 2026 14:24:03 -0700 Subject: [PATCH 2/3] fix(testing): scale callback timers --- .../test/callback/test_callback_heartbeat.py | 1 + .../test/conftest.py | 4 +- .../test_wait_for_callback_heartbeat.py | 1 + .../executor.py | 14 +++++-- .../tests/executor_test.py | 40 +++++++++++++++++++ 5 files changed, 56 insertions(+), 4 deletions(-) diff --git a/packages/aws-durable-execution-sdk-python-examples/test/callback/test_callback_heartbeat.py b/packages/aws-durable-execution-sdk-python-examples/test/callback/test_callback_heartbeat.py index 66a99e9..fc5b559 100644 --- a/packages/aws-durable-execution-sdk-python-examples/test/callback/test_callback_heartbeat.py +++ b/packages/aws-durable-execution-sdk-python-examples/test/callback/test_callback_heartbeat.py @@ -12,6 +12,7 @@ @pytest.mark.durable_execution( handler=callback_heartbeat.handler, lambda_function_name="Create Callback Heartbeat", + time_scale="1.0", ) def test_handle_callback_operations_with_failure_uncaught(durable_runner): """Test handling callback operations with failure.""" diff --git a/packages/aws-durable-execution-sdk-python-examples/test/conftest.py b/packages/aws-durable-execution-sdk-python-examples/test/conftest.py index f3e14df..a99c378 100644 --- a/packages/aws-durable-execution-sdk-python-examples/test/conftest.py +++ b/packages/aws-durable-execution-sdk-python-examples/test/conftest.py @@ -221,7 +221,9 @@ def test_hello_world(durable_runner): else: if not handler: pytest.fail("handler is required for local mode tests") - time_scale = os.environ.get("DURABLE_EXECUTION_TIME_SCALE", "0.05") + time_scale = marker.kwargs.get( + "time_scale", os.environ.get("DURABLE_EXECUTION_TIME_SCALE", "0.05") + ) monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", time_scale) try: poll_interval = min(0.05, max(0.001, float(time_scale))) diff --git a/packages/aws-durable-execution-sdk-python-examples/test/wait_for_callback/test_wait_for_callback_heartbeat.py b/packages/aws-durable-execution-sdk-python-examples/test/wait_for_callback/test_wait_for_callback_heartbeat.py index bdbf627..f150ce8 100644 --- a/packages/aws-durable-execution-sdk-python-examples/test/wait_for_callback/test_wait_for_callback_heartbeat.py +++ b/packages/aws-durable-execution-sdk-python-examples/test/wait_for_callback/test_wait_for_callback_heartbeat.py @@ -14,6 +14,7 @@ @pytest.mark.durable_execution( handler=wait_for_callback_heartbeat.handler, lambda_function_name="Wait For Callback Heartbeat Sends", + time_scale="1.0", ) def test_handle_wait_for_callback_heartbeat_scenarios_during_long_running_submitter( durable_runner, diff --git a/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/executor.py b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/executor.py index 02a1504..5406170 100644 --- a/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/executor.py @@ -55,6 +55,7 @@ Execution as ExecutionSummary, ) from aws_durable_execution_sdk_python_testing.observer import ExecutionObserver +from aws_durable_execution_sdk_python_testing.time_scale import scale_delay from aws_durable_execution_sdk_python_testing.token import CallbackToken @@ -1093,26 +1094,30 @@ def _schedule_callback_timeouts( # Schedule main timeout if configured if callback_options.timeout_seconds > 0: + timeout_delay = scale_delay(callback_options.timeout_seconds) def timeout_handler(): self._on_callback_timeout(execution_arn, callback_id) timeout_future = self._scheduler.call_later( timeout_handler, - delay=callback_options.timeout_seconds, + delay=timeout_delay, completion_event=completion_event, ) self._callback_timeouts[callback_id] = timeout_future # Schedule heartbeat timeout if configured if callback_options.heartbeat_timeout_seconds > 0: + heartbeat_delay = scale_delay( + callback_options.heartbeat_timeout_seconds + ) def heartbeat_timeout_handler(): self._on_callback_heartbeat_timeout(execution_arn, callback_id) heartbeat_future = self._scheduler.call_later( heartbeat_timeout_handler, - delay=callback_options.heartbeat_timeout_seconds, + delay=heartbeat_delay, completion_event=completion_event, ) self._callback_heartbeats[callback_id] = heartbeat_future @@ -1148,6 +1153,9 @@ def _reset_callback_heartbeat_timeout( break if callback_options and callback_options.heartbeat_timeout_seconds > 0: + heartbeat_delay = scale_delay( + callback_options.heartbeat_timeout_seconds + ) def heartbeat_timeout_handler(): self._on_callback_heartbeat_timeout(execution_arn, callback_id) @@ -1156,7 +1164,7 @@ def heartbeat_timeout_handler(): heartbeat_future = self._scheduler.call_later( heartbeat_timeout_handler, - delay=callback_options.heartbeat_timeout_seconds, + delay=heartbeat_delay, completion_event=completion_event, ) self._callback_heartbeats[callback_id] = heartbeat_future diff --git a/packages/aws-durable-execution-sdk-python-testing/tests/executor_test.py b/packages/aws-durable-execution-sdk-python-testing/tests/executor_test.py index e228a0d..0ed7393 100644 --- a/packages/aws-durable-execution-sdk-python-testing/tests/executor_test.py +++ b/packages/aws-durable-execution-sdk-python-testing/tests/executor_test.py @@ -2584,6 +2584,21 @@ def test_callback_timeout_scheduling(executor, mock_store, mock_scheduler): assert mock_scheduler.call_later.call_count == 2 # main timeout + heartbeat timeout +def test_callback_timeout_scheduling_scales_delays( + executor, mock_store, mock_scheduler, monkeypatch +): + """Test that callback timeout timers honor local time scaling.""" + monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.05") + callback_options = CallbackOptions(timeout_seconds=60, heartbeat_timeout_seconds=30) + executor._completion_events["test-arn"] = Mock() + + executor._schedule_callback_timeouts("test-arn", callback_options, "callback-id") + + timeout_call, heartbeat_call = mock_scheduler.call_later.call_args_list + assert timeout_call.kwargs["delay"] == 3.0 + assert heartbeat_call.kwargs["delay"] == 1.5 + + def test_callback_timeout_cleanup(executor, mock_store): """Test that callback timeouts are cleaned up when callback completes.""" # Create mock timeout events @@ -2636,6 +2651,31 @@ def test_callback_heartbeat_timeout_reset(executor, mock_store, mock_scheduler): mock_scheduler.call_later.assert_called() +def test_callback_heartbeat_timeout_reset_scales_delay( + executor, mock_store, mock_scheduler, monkeypatch +): + """Test that reset heartbeat timers honor local time scaling.""" + monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.05") + callback_token = CallbackToken(execution_arn="test-arn", operation_id="op-123") + callback_id = callback_token.to_str() + mock_execution = Mock() + callback_options = CallbackOptions(heartbeat_timeout_seconds=30) + update = OperationUpdate( + operation_id="op-123", + operation_type=OperationType.CALLBACK, + action=OperationAction.START, + callback_options=callback_options, + ) + mock_execution.updates = [update] + mock_store.load.return_value = mock_execution + executor._callback_heartbeats[callback_id] = Mock() + + executor._reset_callback_heartbeat_timeout(callback_id, "test-arn") + + mock_scheduler.call_later.assert_called_once() + assert mock_scheduler.call_later.call_args.kwargs["delay"] == 1.5 + + def test_callback_timeout_handlers(executor, mock_store): """Test callback timeout and heartbeat timeout handlers.""" # Create callback token From 8729043d5383aa0befccc4758b68b31ccee119ed Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 30 Jun 2026 14:34:12 -0700 Subject: [PATCH 3/3] fix(testing): floor callback timer scaling --- .../executor.py | 13 +++++++--- .../time_scale.py | 8 ++++-- .../tests/executor_test.py | 25 +++++++++++++++---- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/executor.py b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/executor.py index 5406170..6b95c7e 100644 --- a/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/executor.py @@ -72,6 +72,8 @@ logger = logging.getLogger(__name__) +_CALLBACK_TIMEOUT_MINIMUM_DELAY_SECONDS = 5.0 + class Executor(ExecutionObserver): MAX_CONSECUTIVE_FAILED_ATTEMPTS: int = 5 @@ -1094,7 +1096,10 @@ def _schedule_callback_timeouts( # Schedule main timeout if configured if callback_options.timeout_seconds > 0: - timeout_delay = scale_delay(callback_options.timeout_seconds) + timeout_delay = scale_delay( + callback_options.timeout_seconds, + minimum=_CALLBACK_TIMEOUT_MINIMUM_DELAY_SECONDS, + ) def timeout_handler(): self._on_callback_timeout(execution_arn, callback_id) @@ -1109,7 +1114,8 @@ def timeout_handler(): # Schedule heartbeat timeout if configured if callback_options.heartbeat_timeout_seconds > 0: heartbeat_delay = scale_delay( - callback_options.heartbeat_timeout_seconds + callback_options.heartbeat_timeout_seconds, + minimum=_CALLBACK_TIMEOUT_MINIMUM_DELAY_SECONDS, ) def heartbeat_timeout_handler(): @@ -1154,7 +1160,8 @@ def _reset_callback_heartbeat_timeout( if callback_options and callback_options.heartbeat_timeout_seconds > 0: heartbeat_delay = scale_delay( - callback_options.heartbeat_timeout_seconds + callback_options.heartbeat_timeout_seconds, + minimum=_CALLBACK_TIMEOUT_MINIMUM_DELAY_SECONDS, ) def heartbeat_timeout_handler(): diff --git a/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/time_scale.py b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/time_scale.py index d8f2d0d..7fd6873 100644 --- a/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/time_scale.py +++ b/packages/aws-durable-execution-sdk-python-testing/src/aws_durable_execution_sdk_python_testing/time_scale.py @@ -27,6 +27,10 @@ def get_time_scale() -> float: return scale -def scale_delay(delay: float | int) -> float: +def scale_delay(delay: float | int, *, minimum: float = 0) -> float: """Scale a durable timer delay for local testing.""" - return float(delay) * get_time_scale() + scaled_delay = float(delay) * get_time_scale() + if minimum <= 0: + return scaled_delay + + return max(scaled_delay, min(float(delay), minimum)) diff --git a/packages/aws-durable-execution-sdk-python-testing/tests/executor_test.py b/packages/aws-durable-execution-sdk-python-testing/tests/executor_test.py index 0ed7393..98f40ba 100644 --- a/packages/aws-durable-execution-sdk-python-testing/tests/executor_test.py +++ b/packages/aws-durable-execution-sdk-python-testing/tests/executor_test.py @@ -2587,7 +2587,7 @@ def test_callback_timeout_scheduling(executor, mock_store, mock_scheduler): def test_callback_timeout_scheduling_scales_delays( executor, mock_store, mock_scheduler, monkeypatch ): - """Test that callback timeout timers honor local time scaling.""" + """Test that callback timeout timers honor local time scaling with a floor.""" monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.05") callback_options = CallbackOptions(timeout_seconds=60, heartbeat_timeout_seconds=30) executor._completion_events["test-arn"] = Mock() @@ -2595,8 +2595,23 @@ def test_callback_timeout_scheduling_scales_delays( executor._schedule_callback_timeouts("test-arn", callback_options, "callback-id") timeout_call, heartbeat_call = mock_scheduler.call_later.call_args_list - assert timeout_call.kwargs["delay"] == 3.0 - assert heartbeat_call.kwargs["delay"] == 1.5 + assert timeout_call.kwargs["delay"] == 5.0 + assert heartbeat_call.kwargs["delay"] == 5.0 + + +def test_callback_timeout_scheduling_keeps_short_delays( + executor, mock_store, mock_scheduler, monkeypatch +): + """Test that short callback timeout timers are not inflated by the floor.""" + monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.05") + callback_options = CallbackOptions(timeout_seconds=1, heartbeat_timeout_seconds=2) + executor._completion_events["test-arn"] = Mock() + + executor._schedule_callback_timeouts("test-arn", callback_options, "callback-id") + + timeout_call, heartbeat_call = mock_scheduler.call_later.call_args_list + assert timeout_call.kwargs["delay"] == 1.0 + assert heartbeat_call.kwargs["delay"] == 2.0 def test_callback_timeout_cleanup(executor, mock_store): @@ -2654,7 +2669,7 @@ def test_callback_heartbeat_timeout_reset(executor, mock_store, mock_scheduler): def test_callback_heartbeat_timeout_reset_scales_delay( executor, mock_store, mock_scheduler, monkeypatch ): - """Test that reset heartbeat timers honor local time scaling.""" + """Test that reset heartbeat timers honor local time scaling with a floor.""" monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.05") callback_token = CallbackToken(execution_arn="test-arn", operation_id="op-123") callback_id = callback_token.to_str() @@ -2673,7 +2688,7 @@ def test_callback_heartbeat_timeout_reset_scales_delay( executor._reset_callback_heartbeat_timeout(callback_id, "test-arn") mock_scheduler.call_later.assert_called_once() - assert mock_scheduler.call_later.call_args.kwargs["delay"] == 1.5 + assert mock_scheduler.call_later.call_args.kwargs["delay"] == 5.0 def test_callback_timeout_handlers(executor, mock_store):