Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
@pytest.mark.durable_execution(
handler=callback_heartbeat.handler,
lambda_function_name="Create Callback Heartbeat",
time_scale="1.0",
)
def test_handle_callback_operations_with_failure_uncaught(durable_runner):
"""Test handling callback operations with failure."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):


@pytest.fixture
def durable_runner(request):
def durable_runner(request, monkeypatch):
"""Pytest fixture that provides a test runner based on configuration.

Configuration for cloud mode:
Expand Down Expand Up @@ -221,8 +221,16 @@ def test_hello_world(durable_runner):
else:
if not handler:
pytest.fail("handler is required for local mode tests")
time_scale = marker.kwargs.get(
"time_scale", os.environ.get("DURABLE_EXECUTION_TIME_SCALE", "0.05")
)
monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", time_scale)
try:
poll_interval = min(0.05, max(0.001, float(time_scale)))
except ValueError:
poll_interval = 0.05
# Create local runner (needs cleanup via context manager)
runner = DurableFunctionTestRunner(handler=handler)
runner = DurableFunctionTestRunner(handler=handler, poll_interval=poll_interval)

# Wrap in adapter and use context manager for proper cleanup
with TestRunnerAdapter(runner, runner_mode) as adapter:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
@pytest.mark.durable_execution(
handler=wait_for_callback_heartbeat.handler,
lambda_function_name="Wait For Callback Heartbeat Sends",
time_scale="1.0",
)
def test_handle_wait_for_callback_heartbeat_scenarios_during_long_running_submitter(
durable_runner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from aws_durable_execution_sdk_python_testing.exceptions import (
InvalidParameterValueException,
)
from aws_durable_execution_sdk_python_testing.time_scale import scale_delay


if TYPE_CHECKING:
Expand Down Expand Up @@ -50,7 +51,8 @@ def process(
if update.step_options
else 0
)
next_attempt_time = datetime.now(UTC) + timedelta(seconds=delay)
scaled_delay = scale_delay(delay)
next_attempt_time = datetime.now(UTC) + timedelta(seconds=scaled_delay)

# Build new step_details with incremented attempt
current_attempt = (
Expand Down Expand Up @@ -103,7 +105,7 @@ def process(
notifier.notify_step_retry_scheduled(
execution_arn=execution_arn,
operation_id=update.operation_id,
delay=delay,
delay=scaled_delay,
)
return retry_operation
case OperationAction.SUCCEED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from __future__ import annotations

import logging
import os
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING

Expand All @@ -21,6 +19,7 @@
from aws_durable_execution_sdk_python_testing.exceptions import (
InvalidParameterValueException,
)
from aws_durable_execution_sdk_python_testing.time_scale import scale_delay


if TYPE_CHECKING:
Expand All @@ -43,9 +42,7 @@ def process(
wait_seconds = (
update.wait_options.wait_seconds if update.wait_options else 0
)
time_scale = float(os.getenv("DURABLE_EXECUTION_TIME_SCALE", "1.0"))
logging.info("Using DURABLE_EXECUTION_TIME_SCALE: %f", time_scale)
scaled_wait_seconds = wait_seconds * time_scale
scaled_wait_seconds = scale_delay(wait_seconds)

scheduled_end_timestamp = datetime.now(UTC) + timedelta(
seconds=scaled_wait_seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
Execution as ExecutionSummary,
)
from aws_durable_execution_sdk_python_testing.observer import ExecutionObserver
from aws_durable_execution_sdk_python_testing.time_scale import scale_delay
from aws_durable_execution_sdk_python_testing.token import CallbackToken


Expand All @@ -71,6 +72,8 @@

logger = logging.getLogger(__name__)

_CALLBACK_TIMEOUT_MINIMUM_DELAY_SECONDS = 5.0


class Executor(ExecutionObserver):
MAX_CONSECUTIVE_FAILED_ATTEMPTS: int = 5
Expand Down Expand Up @@ -1093,26 +1096,34 @@ def _schedule_callback_timeouts(

# Schedule main timeout if configured
if callback_options.timeout_seconds > 0:
timeout_delay = scale_delay(
callback_options.timeout_seconds,
minimum=_CALLBACK_TIMEOUT_MINIMUM_DELAY_SECONDS,
)

def timeout_handler():
self._on_callback_timeout(execution_arn, callback_id)

timeout_future = self._scheduler.call_later(
timeout_handler,
delay=callback_options.timeout_seconds,
delay=timeout_delay,
completion_event=completion_event,
)
self._callback_timeouts[callback_id] = timeout_future

# Schedule heartbeat timeout if configured
if callback_options.heartbeat_timeout_seconds > 0:
heartbeat_delay = scale_delay(
callback_options.heartbeat_timeout_seconds,
minimum=_CALLBACK_TIMEOUT_MINIMUM_DELAY_SECONDS,
)

def heartbeat_timeout_handler():
self._on_callback_heartbeat_timeout(execution_arn, callback_id)

heartbeat_future = self._scheduler.call_later(
heartbeat_timeout_handler,
delay=callback_options.heartbeat_timeout_seconds,
delay=heartbeat_delay,
completion_event=completion_event,
)
self._callback_heartbeats[callback_id] = heartbeat_future
Expand Down Expand Up @@ -1148,6 +1159,10 @@ def _reset_callback_heartbeat_timeout(
break

if callback_options and callback_options.heartbeat_timeout_seconds > 0:
heartbeat_delay = scale_delay(
callback_options.heartbeat_timeout_seconds,
minimum=_CALLBACK_TIMEOUT_MINIMUM_DELAY_SECONDS,
)

def heartbeat_timeout_handler():
self._on_callback_heartbeat_timeout(execution_arn, callback_id)
Expand All @@ -1156,7 +1171,7 @@ def heartbeat_timeout_handler():

heartbeat_future = self._scheduler.call_later(
heartbeat_timeout_handler,
delay=callback_options.heartbeat_timeout_seconds,
delay=heartbeat_delay,
completion_event=completion_event,
)
self._callback_heartbeats[callback_id] = heartbeat_future
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Helpers for scaling local durable timer delays in tests."""

from __future__ import annotations

import logging
import os


logger = logging.getLogger(__name__)

_TIME_SCALE_ENV = "DURABLE_EXECUTION_TIME_SCALE"


def get_time_scale() -> float:
"""Return the configured local durable timer scale."""
raw_scale = os.getenv(_TIME_SCALE_ENV, "1.0")
try:
scale = float(raw_scale)
except ValueError:
logger.warning("Ignoring invalid %s value: %s", _TIME_SCALE_ENV, raw_scale)
return 1.0

if scale < 0:
logger.warning("Ignoring negative %s value: %s", _TIME_SCALE_ENV, raw_scale)
return 1.0

return scale


def scale_delay(delay: float | int, *, minimum: float = 0) -> float:
"""Scale a durable timer delay for local testing."""
scaled_delay = float(delay) * get_time_scale()
if minimum <= 0:
return scaled_delay

return max(scaled_delay, min(float(delay), minimum))
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,33 @@ def test_process_retry_action():
assert notifier.step_retry_calls[0] == (execution_arn, "step-123", 30)


def test_process_retry_action_scales_retry_delay(monkeypatch):
monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.5")
processor = StepProcessor()
notifier = MockNotifier()
execution_arn = "arn:aws:states:us-east-1:123456789012:execution:test"

current_op = Mock()
current_op.start_timestamp = datetime.now(UTC)
current_op.step_details = StepDetails(attempt=1)
current_op.execution_details = None
current_op.context_details = None
current_op.wait_details = None
current_op.callback_details = None
current_op.chained_invoke_details = None

update = OperationUpdate(
operation_id="step-123",
operation_type=OperationType.STEP,
action=OperationAction.RETRY,
step_options=StepOptions(next_attempt_delay_seconds=30),
)

processor.process(update, current_op, notifier, execution_arn)

assert notifier.step_retry_calls[0] == (execution_arn, "step-123", 15)


def test_process_retry_action_without_step_options():
processor = StepProcessor()
notifier = MockNotifier()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ def test_process_start_action():
assert notifier.wait_timer_calls[0] == (execution_arn, "wait-123", 30)


def test_process_start_action_scales_wait_delay(monkeypatch):
monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.5")
processor = WaitProcessor()
notifier = MockNotifier()
execution_arn = "arn:aws:states:us-east-1:123456789012:execution:test"

update = OperationUpdate(
operation_id="wait-123",
operation_type=OperationType.WAIT,
action=OperationAction.START,
wait_options=WaitOptions(wait_seconds=30),
)

processor.process(update, None, notifier, execution_arn)

assert notifier.wait_timer_calls[0] == (execution_arn, "wait-123", 15)


def test_process_start_action_without_wait_options():
processor = WaitProcessor()
notifier = MockNotifier()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2576,6 +2576,36 @@ def test_callback_timeout_scheduling(executor, mock_store, mock_scheduler):
assert mock_scheduler.call_later.call_count == 2 # main timeout + heartbeat timeout


def test_callback_timeout_scheduling_scales_delays(
executor, mock_store, mock_scheduler, monkeypatch
):
"""Test that callback timeout timers honor local time scaling with a floor."""
monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.05")
callback_options = CallbackOptions(timeout_seconds=60, heartbeat_timeout_seconds=30)
executor._completion_events["test-arn"] = Mock()

executor._schedule_callback_timeouts("test-arn", callback_options, "callback-id")

timeout_call, heartbeat_call = mock_scheduler.call_later.call_args_list
assert timeout_call.kwargs["delay"] == 5.0
assert heartbeat_call.kwargs["delay"] == 5.0


def test_callback_timeout_scheduling_keeps_short_delays(
executor, mock_store, mock_scheduler, monkeypatch
):
"""Test that short callback timeout timers are not inflated by the floor."""
monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.05")
callback_options = CallbackOptions(timeout_seconds=1, heartbeat_timeout_seconds=2)
executor._completion_events["test-arn"] = Mock()

executor._schedule_callback_timeouts("test-arn", callback_options, "callback-id")

timeout_call, heartbeat_call = mock_scheduler.call_later.call_args_list
assert timeout_call.kwargs["delay"] == 1.0
assert heartbeat_call.kwargs["delay"] == 2.0


def test_callback_timeout_cleanup(executor, mock_store):
"""Test that callback timeouts are cleaned up when callback completes."""
# Create mock timeout events
Expand Down Expand Up @@ -2628,6 +2658,31 @@ def test_callback_heartbeat_timeout_reset(executor, mock_store, mock_scheduler):
mock_scheduler.call_later.assert_called()


def test_callback_heartbeat_timeout_reset_scales_delay(
executor, mock_store, mock_scheduler, monkeypatch
):
"""Test that reset heartbeat timers honor local time scaling with a floor."""
monkeypatch.setenv("DURABLE_EXECUTION_TIME_SCALE", "0.05")
callback_token = CallbackToken(execution_arn="test-arn", operation_id="op-123")
callback_id = callback_token.to_str()
mock_execution = Mock()
callback_options = CallbackOptions(heartbeat_timeout_seconds=30)
update = OperationUpdate(
operation_id="op-123",
operation_type=OperationType.CALLBACK,
action=OperationAction.START,
callback_options=callback_options,
)
mock_execution.updates = [update]
mock_store.load.return_value = mock_execution
executor._callback_heartbeats[callback_id] = Mock()

executor._reset_callback_heartbeat_timeout(callback_id, "test-arn")

mock_scheduler.call_later.assert_called_once()
assert mock_scheduler.call_later.call_args.kwargs["delay"] == 5.0


def test_callback_timeout_handlers(executor, mock_store):
"""Test callback timeout and heartbeat timeout handlers."""
# Create callback token
Expand Down