From 12ba6d6a98d3f73e87482f8cae0900c5153f07dd Mon Sep 17 00:00:00 2001 From: aballman <35912177+aballman@users.noreply.github.com> Date: Thu, 21 May 2026 08:48:02 -0700 Subject: [PATCH 1/3] feat(FS-1988): add min_attempts and absolute_max_elapsed_time_ms to BackoffStrategy Closes a short-circuit in retry_with_backoff{,_async} where a single slow first attempt can exhaust max_elapsed_time before any retry fires. With the partitioner config (max_elapsed_time=5min, CLIENT_TIMEOUT_MS=30min), any chunk whose first attempt exceeds 5 minutes blew the retry budget on attempt 1 -- zero retries on subsequent transient errors. Documented in two customer-visible regressions in the platform partition path. New fields on BackoffStrategy: * min_attempts (default 0) -- minimum number of retry attempts that must fire before max_elapsed_time is honored. Counts retries (not the initial attempt). Default 0 preserves existing behavior. * absolute_max_elapsed_time_ms (default None) -- cap on when a new retry can START. Does NOT interrupt an in-flight func() call. Worst-case wall-clock under this cap is absolute_max_elapsed_time_ms + per_attempt_timeout. Default None preserves existing behavior. Loop changes in both retry_with_backoff and retry_with_backoff_async: * Post-attempt cap check honors min_attempts as a floor on the soft cap and treats the hard cap as unconditional. * Pre-sleep hard-cap check refuses to sleep into a doomed retry whose projected start would exceed the hard cap. * Post-sleep verification (belt-and-suspenders against late wakeups and rounding drift in the projection). * Helper extraction (_cap_hit_after_attempt, _raise_or_return_after_cap) dedupes logic between sync and async paths. Validation rejects min_attempts < 0, absolute_max_elapsed_time_ms <= 0, and absolute_max_elapsed_time_ms below max_elapsed_time. .genignore: ignore src/unstructured_client/utils/retries.py to preserve these fields across Speakeasy regens. Documented merge procedure for future template updates. Pushing these fields upstream to Speakeasy templates is filed as a follow-up. Tests: * T1-T14 in unit/test_retries.py with a fake-clock harness that monkeypatches time.time / time.sleep / asyncio.sleep / random.uniform. Covers the v1 reproducer (slow first attempt + min_attempts floor), floor-is-not-a-ceiling semantics, hard cap overrides floor, sleep truncation, TemporaryError early-return through both caps, and PermanentError short-circuit immunity. * Validation tests for the new __init__ guards. * New integration test test_split_pdf_cache_tmp_data_chunk_request_stream_is_replay_safe pins the body-replay invariant: chunks built from open file objects (cache_tmp_data=True) must produce replay-safe httpx requests so SDK retries actually deliver the original multipart payload. Iterates request.stream twice directly to bypass request.read() caching. --- .genignore | 13 + CHANGELOG.md | 3 + .../integration/test_decorators.py | 76 +++ .../unit/test_retries.py | 551 +++++++++++++++++- src/unstructured_client/utils/retries.py | 218 ++++++- 5 files changed, 836 insertions(+), 25 deletions(-) diff --git a/.genignore b/.genignore index 8e5fcfd4..cbce1a37 100644 --- a/.genignore +++ b/.genignore @@ -19,3 +19,16 @@ src/unstructured_client/users.py # - Adjust the custom url snippets in the file # - Bring back the ignore line and commit src/unstructured_client/general.py + +# Ignore retries.py so we can carry the `min_attempts` and +# `absolute_max_elapsed_time_ms` fields on BackoffStrategy plus the +# soft/hard cap logic in retry_with_backoff{,_async}. Filed in FS-1988. +# Speakeasy regen would otherwise wipe these fields. To pick up upstream +# fixes to the generated retry runtime: +# - Comment out this ignore line +# - Generate locally +# - Manually re-merge the FS-1988 additions +# - Re-add the ignore line and commit +# Long-term: push these fields upstream to Speakeasy templates so this +# ignore can be removed. +src/unstructured_client/utils/retries.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c11d350..ac17c421 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ ### Breaking changes * Removed deprecated connector config models from the SDK (e.g. `S3SourceConnectorConfig`, `AzureDestinationConnectorConfig`). Pass connector configs as plain dicts with arbitrary fields. The SDK is no longer coupled to backend connector schemas — new fields work without an SDK upgrade. +### Features +* Add `min_attempts` and `absolute_max_elapsed_time_ms` fields to `BackoffStrategy`. `min_attempts` is the minimum number of retry attempts that must fire before `max_elapsed_time` is honored; defaults to `0` (preserves existing behavior). `absolute_max_elapsed_time_ms` caps when a new retry can start (does not interrupt in-flight requests); defaults to `None`. Together these close a short-circuit where a single slow first attempt could exhaust the retry budget before any retry fired. See FS-1988. + ## 0.43.4 ### Enhancements diff --git a/_test_unstructured_client/integration/test_decorators.py b/_test_unstructured_client/integration/test_decorators.py index 1fd69bfb..41aa5f67 100644 --- a/_test_unstructured_client/integration/test_decorators.py +++ b/_test_unstructured_client/integration/test_decorators.py @@ -813,3 +813,79 @@ async def mock_send(_, request: httpx.Request, **kwargs): assert number_of_transport_failures == 0 assert mock_endpoint_called assert res.status_code == 200 + + +def test_split_pdf_cache_tmp_data_chunk_request_stream_is_replay_safe(tmp_path): + """Regression test for FS-1988: when split_pdf_cache_tmp_data=True + is set, chunks are loaded as opened file objects (not BytesIO) and + handed to `create_pdf_chunk_request`. The resulting `httpx.Request` + must be replay-safe — i.e. its body stream can be iterated more + than once, returning identical bytes each time. + + Body replay is what makes SDK-level retries on transient errors + (ReadTimeout, ConnectError, etc.) actually deliver the original + multipart payload to the server. A future Speakeasy template + change or refactor of `serialize_request_body` that produced a + single-consumption stream (e.g. an `Iterable[bytes]` over an open + file handle) would silently break retries: the server would see + an empty body on the second attempt. + + The invariant is pinned by iterating `request.stream` twice + directly — NOT via `request.read()`, which caches into + `Request._content` and would paper over a non-replayable stream + after the first call. This is the actual transport-level path + httpcore uses when `AsyncClient.send` retries a request. + """ + from unstructured_client._hooks.custom.request_utils import ( + create_pdf_chunk_request, + ) + + # Drive `create_pdf_chunk_request` directly with a file-object + # chunk — the cache_tmp_data=True branch in `_get_pdf_chunk_files`. + chunk_path = tmp_path / "chunk.pdf" + src_bytes = Path("_sample_docs/layout-parser-paper.pdf").read_bytes() + chunk_path.write_bytes(src_bytes) + + pdf_chunk_file = open(chunk_path, "rb") # noqa: SIM115 -- closed manually + try: + form_data = { + "files": (chunk_path.name, src_bytes, "application/pdf"), + "strategy": "fast", + } + original_request = httpx.Request( + method="POST", + url="http://localhost:8000/general/v0/general", + headers={ + "Content-Type": "multipart/form-data; boundary=test", + "User-Agent": "test", + }, + content=b"", + ) + + chunk_request = create_pdf_chunk_request( + form_data=form_data, + pdf_chunk=(pdf_chunk_file, 1), + original_request=original_request, + filename=chunk_path.name, + ) + + # Iterate the body stream twice. If the underlying stream is + # not replayable (e.g. an open file handle that's exhausted + # after the first pass), the second iteration yields empty + # or partial bytes and the assert below fails. + first_pass = b"".join(chunk_request.stream) + second_pass = b"".join(chunk_request.stream) + + assert len(first_pass) > 1000, ( + f"First iteration produced too-small body ({len(first_pass)} " + "bytes); a real PDF chunk multipart envelope should be at " + "least kilobytes." + ) + assert first_pass == second_pass, ( + "Body stream not replay-safe: second iteration of " + "chunk_request.stream returned different bytes than the " + "first. SDK retries on transient errors would silently " + "send a truncated or empty body to the server." + ) + finally: + pdf_chunk_file.close() diff --git a/_test_unstructured_client/unit/test_retries.py b/_test_unstructured_client/unit/test_retries.py index fffbba8e..9fc58c53 100644 --- a/_test_unstructured_client/unit/test_retries.py +++ b/_test_unstructured_client/unit/test_retries.py @@ -1,21 +1,115 @@ -"""Tests for retry logic covering all TransportError subclasses.""" +"""Tests for retry logic covering all TransportError subclasses +and the FS-1988 min_attempts / absolute_max_elapsed_time_ms semantics.""" import asyncio +import time as time_module from unittest.mock import MagicMock import httpx import pytest +from unstructured_client.utils import retries as retries_module from unstructured_client.utils.retries import ( BackoffStrategy, PermanentError, Retries, RetryConfig, + TemporaryError, retry, retry_async, ) +# ----------------------------------------------------------------- +# FS-1988 fake-clock harness +# +# `retries.py` reads `time.time()` and calls `time.sleep()` / +# `asyncio.sleep()`. The retries module imports `time` and `asyncio` +# at module scope, so we patch those module attributes. Each test +# starts the clock at 0ms and advances it explicitly: +# - `time.sleep(s)` advances the fake clock by s*1000 ms +# - `asyncio.sleep(s)` advances the fake clock by s*1000 ms +# - the test `func` callable advances the clock by its own simulated +# attempt duration before raising / returning +# ----------------------------------------------------------------- + + +class FakeClock: + """Mutable monotonic clock for retry loop tests. + + Tracks elapsed milliseconds since fixture creation. `now_ms` is + incremented by patched sleep functions and by test func() bodies. + `time()` returns a float seconds value compatible with + `round(time.time() * 1000)` patterns in retries.py. + """ + + def __init__(self): + self.now_ms = 0 + + def time(self) -> float: + return self.now_ms / 1000.0 + + def advance(self, milliseconds: int) -> None: + self.now_ms += int(milliseconds) + + +@pytest.fixture +def fake_clock(monkeypatch): + clock = FakeClock() + + def fake_time(): + return clock.time() + + def fake_sync_sleep(seconds): + clock.advance(int(seconds * 1000)) + + async def fake_async_sleep(seconds): + clock.advance(int(seconds * 1000)) + + monkeypatch.setattr(retries_module.time, "time", fake_time) + monkeypatch.setattr(retries_module.time, "sleep", fake_sync_sleep) + monkeypatch.setattr(retries_module.asyncio, "sleep", fake_async_sleep) + # Strip jitter so backoff durations are exactly reproducible. + monkeypatch.setattr( + retries_module.random, "uniform", lambda a, b: 0.0 + ) + return clock + + +def _retries( + *, + initial_interval: int = 100, + max_interval: int = 200, + exponent: float = 1.5, + max_elapsed_time: int = 5_000, + min_attempts: int = 0, + absolute_max_elapsed_time_ms=None, + retry_connection_errors: bool = True, + status_codes=None, +) -> Retries: + return Retries( + config=RetryConfig( + strategy="backoff", + backoff=BackoffStrategy( + initial_interval=initial_interval, + max_interval=max_interval, + exponent=exponent, + max_elapsed_time=max_elapsed_time, + min_attempts=min_attempts, + absolute_max_elapsed_time_ms=absolute_max_elapsed_time_ms, + ), + retry_connection_errors=retry_connection_errors, + ), + status_codes=status_codes or [], + ) + + +def _make_ok_response(status_code: int = 200) -> httpx.Response: + resp = MagicMock(spec=httpx.Response) + resp.status_code = status_code + return resp + + def _make_retries(retry_connection_errors: bool) -> Retries: return Retries( config=RetryConfig( @@ -109,3 +203,458 @@ async def func(): with pytest.raises(exc_class): asyncio.run(retry_async(func, retries_config)) + + +# ----------------------------------------------------------------- +# FS-1988 tests: BackoffStrategy validation (T12-T14) +# These do not need the fake-clock harness. +# ----------------------------------------------------------------- + + +class TestBackoffStrategyValidation: + def test_t12_negative_min_attempts_rejected(self): + with pytest.raises(ValueError, match="min_attempts must be >= 0"): + BackoffStrategy( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=5_000, + min_attempts=-1, + ) + + def test_t13_zero_absolute_max_rejected(self): + with pytest.raises(ValueError, match="absolute_max_elapsed_time_ms must be > 0"): + BackoffStrategy( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=5_000, + absolute_max_elapsed_time_ms=0, + ) + + def test_t14_absolute_max_below_soft_max_rejected(self): + with pytest.raises( + ValueError, + match="absolute_max_elapsed_time_ms must be >= max_elapsed_time", + ): + BackoffStrategy( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=10_000, + absolute_max_elapsed_time_ms=5_000, + ) + + def test_min_attempts_zero_accepted(self): + BackoffStrategy( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=5_000, + min_attempts=0, + ) + + def test_absolute_max_equal_to_soft_max_accepted(self): + BackoffStrategy( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=5_000, + absolute_max_elapsed_time_ms=5_000, + ) + + +# ----------------------------------------------------------------- +# FS-1988 tests: T1, T2 -- backward compatibility (min_attempts=0) +# ----------------------------------------------------------------- + + +class TestBackwardCompat: + def test_t1_no_failures_sync(self, fake_clock): + retries_config = _retries() + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + return _make_ok_response() + + result = retry(func, retries_config) + assert result.status_code == 200 + assert call_count == 1 + + def test_t1_no_failures_async(self, fake_clock): + retries_config = _retries() + call_count = 0 + + async def func(): + nonlocal call_count + call_count += 1 + return _make_ok_response() + + result = asyncio.run(retry_async(func, retries_config)) + assert result.status_code == 200 + assert call_count == 1 + + def test_t2_one_transient_then_success_sync(self, fake_clock): + retries_config = _retries() + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise httpx.ConnectError("transient") + return _make_ok_response() + + result = retry(func, retries_config) + assert result.status_code == 200 + assert call_count == 2 + + def test_t2_one_transient_then_success_async(self, fake_clock): + retries_config = _retries() + call_count = 0 + + async def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise httpx.ConnectError("transient") + return _make_ok_response() + + result = asyncio.run(retry_async(func, retries_config)) + assert result.status_code == 200 + assert call_count == 2 + + +# ----------------------------------------------------------------- +# FS-1988 tests: T3 -- the v1 reproducer. +# Attempt 1 takes longer than the soft cap; the min_attempts floor +# must permit attempt 2 to fire, which then succeeds. +# ----------------------------------------------------------------- + + +class TestSoftCapFloor: + def test_t3_slow_first_attempt_then_success_sync(self, fake_clock): + # Soft cap 5s, attempt 1 takes 6s, min_attempts=2. + retries_config = _retries( + max_elapsed_time=5_000, + min_attempts=2, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(6_000) # 6s + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + result = retry(func, retries_config) + assert result.status_code == 200 + assert call_count == 2 + + def test_t3_slow_first_attempt_then_success_async(self, fake_clock): + retries_config = _retries( + max_elapsed_time=5_000, + min_attempts=2, + ) + call_count = 0 + + async def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(6_000) + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + result = asyncio.run(retry_async(func, retries_config)) + assert result.status_code == 200 + assert call_count == 2 + + def test_t3_baseline_pre_fix_behavior(self, fake_clock): + """Without the min_attempts floor (min_attempts=0), the same + scenario short-circuits with no retries -- this is the pre-fix + regression. Documents the bug class we are closing.""" + retries_config = _retries( + max_elapsed_time=5_000, + min_attempts=0, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(6_000) + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + # Without the floor, soft cap fires on attempt 1's failure and + # the loop raises the wrapped ReadTimeout. + with pytest.raises(httpx.ReadTimeout): + retry(func, retries_config) + assert call_count == 1 + + +# ----------------------------------------------------------------- +# FS-1988 tests: T4, T5 -- floor is NOT a ceiling. +# ----------------------------------------------------------------- + + +class TestFloorIsNotCeiling: + def test_t4_floor_does_not_cap_attempts_sync(self, fake_clock): + """min_attempts=2 with 5 transient failures + budget should + produce 6 total attempts, NOT 3.""" + retries_config = _retries( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=60_000, # generous budget + min_attempts=2, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count <= 5: + raise httpx.ConnectError("transient") + return _make_ok_response() + + result = retry(func, retries_config) + assert result.status_code == 200 + assert call_count == 6 + + def test_t5_floor_then_soft_cap_sync(self, fake_clock): + """With min_attempts=2 and all attempts failing fast, the + loop must fire AT LEAST 3 total attempts (initial + 2 retries) + before the soft cap can cut in.""" + retries_config = _retries( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=10_000, + min_attempts=2, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + # Each attempt itself is instantaneous; only the backoff + # sleep advances the fake clock. + raise httpx.ConnectError("transient") + + with pytest.raises(httpx.ConnectError): + retry(func, retries_config) + # 3 = initial + 2 retries (the min_attempts floor) + assert call_count >= 3 + + +# ----------------------------------------------------------------- +# FS-1988 tests: T6, T7 -- hard cap overrides the floor; sleep +# truncation prevents sleeping into a doomed retry. +# ----------------------------------------------------------------- + + +class TestHardCap: + def test_t6_hard_cap_overrides_floor_sync(self, fake_clock): + """min_attempts=2 but attempt 1 takes longer than the hard + cap. Hard cap must fire even though the floor is unsatisfied.""" + retries_config = _retries( + max_elapsed_time=5_000, + min_attempts=2, + absolute_max_elapsed_time_ms=10_000, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(11_000) # 11s > 10s hard cap + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + with pytest.raises(httpx.ReadTimeout): + retry(func, retries_config) + # Hard cap fires after attempt 1; no further attempts. + assert call_count == 1 + + def test_t6_hard_cap_overrides_floor_async(self, fake_clock): + retries_config = _retries( + max_elapsed_time=5_000, + min_attempts=2, + absolute_max_elapsed_time_ms=10_000, + ) + call_count = 0 + + async def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(11_000) + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + with pytest.raises(httpx.ReadTimeout): + asyncio.run(retry_async(func, retries_config)) + assert call_count == 1 + + def test_t7_sleep_truncation_prevents_doomed_retry_sync(self, fake_clock): + """Hard cap pre-emptive check: if the sleep alone would push + past the cap, raise immediately rather than sleeping into a + retry that can never start.""" + # Soft cap 5s, hard cap 10s, attempt 1 takes 9.95s. + # Sleep starts at 100ms * 1.5^0 = 100ms (no jitter in tests), + # but we want to assert the sleep+elapsed > hard_cap path + # fires. Use larger initial_interval to make the calculation + # land deterministically. + retries_config = _retries( + initial_interval=200, # 200ms initial + max_interval=1_000, + exponent=2.0, + max_elapsed_time=5_000, + min_attempts=2, # floor is unsatisfied; only hard cap can cut + absolute_max_elapsed_time_ms=10_000, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + # attempt 1 lasts 9.95s -> elapsed = 9.95s, sleep would be + # 200ms -> projected = 10.15s >= hard cap 10s -> raise. + fake_clock.advance(9_950) + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + with pytest.raises(httpx.ReadTimeout): + retry(func, retries_config) + assert call_count == 1 + + +# ----------------------------------------------------------------- +# FS-1988 tests: T8, T9, T10 -- TemporaryError (retryable 5xx) +# early-return behavior under soft and hard cap exhaustion. +# ----------------------------------------------------------------- + + +class TestTemporaryErrorEarlyReturn: + def test_t8_soft_cap_5xx_returns_last_response_sync(self, fake_clock): + """min_attempts=0, retryable 5xx, soft cap exhausted: returns + the last 5xx response (existing behavior).""" + retries_config = _retries( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=500, + min_attempts=0, + status_codes=["5xx"], + ) + bad_response = _make_ok_response(status_code=503) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + # Each attempt itself is instant; backoff sleep accumulates. + return bad_response + + result = retry(func, retries_config) + # Should return the last 5xx response, not raise. + assert result.status_code == 503 + assert call_count >= 1 + + def test_t9_floor_then_soft_cap_5xx_returns_last_response_sync(self, fake_clock): + retries_config = _retries( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=1_000, + min_attempts=2, + status_codes=["5xx"], + ) + bad_response = _make_ok_response(status_code=503) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + return bad_response + + result = retry(func, retries_config) + assert result.status_code == 503 + # Floor enforced: at least initial + 2 retries before soft cap. + assert call_count >= 3 + + def test_t10_hard_cap_5xx_returns_last_response_sync(self, fake_clock): + """Hard cap fires; TemporaryError still returns the response, + not a raised exception.""" + retries_config = _retries( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=5_000, + min_attempts=2, + absolute_max_elapsed_time_ms=10_000, + status_codes=["5xx"], + ) + bad_response = _make_ok_response(status_code=503) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(11_000) # past hard cap + return bad_response + + result = retry(func, retries_config) + assert result.status_code == 503 + assert call_count == 1 + + +# ----------------------------------------------------------------- +# FS-1988 tests: T11 -- PermanentError unaffected by min_attempts. +# ----------------------------------------------------------------- + + +class TestPermanentErrorShortCircuit: + """A non-transport, non-status-code exception raised from func() is + wrapped by retry/retry_async's inner do_request closure in a + PermanentError. The outer retry_with_backoff{,_async} catches + PermanentError and re-raises `.inner`, which short-circuits the loop + regardless of min_attempts. Verifies the floor does NOT force retries + on permanent failures.""" + + def test_t11_permanent_error_one_attempt_sync(self, fake_clock): + retries_config = _retries(min_attempts=10) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + raise ValueError("permanent") + + with pytest.raises(ValueError, match="permanent"): + retry(func, retries_config) + # Even with min_attempts=10, PermanentError short-circuits. + assert call_count == 1 + + def test_t11_permanent_error_one_attempt_async(self, fake_clock): + retries_config = _retries(min_attempts=10) + call_count = 0 + + async def func(): + nonlocal call_count + call_count += 1 + raise ValueError("permanent") + + with pytest.raises(ValueError, match="permanent"): + asyncio.run(retry_async(func, retries_config)) + assert call_count == 1 diff --git a/src/unstructured_client/utils/retries.py b/src/unstructured_client/utils/retries.py index 4b903b58..4a109479 100644 --- a/src/unstructured_client/utils/retries.py +++ b/src/unstructured_client/utils/retries.py @@ -13,6 +13,8 @@ class BackoffStrategy: max_interval: int exponent: float max_elapsed_time: int + min_attempts: int + absolute_max_elapsed_time_ms: int | None def __init__( self, @@ -20,11 +22,57 @@ def __init__( max_interval: int, exponent: float, max_elapsed_time: int, + min_attempts: int = 0, + absolute_max_elapsed_time_ms: int | None = None, ): + # `min_attempts`: minimum number of RETRY attempts (NOT counting + # the initial attempt) that must fire before `max_elapsed_time` + # is honored. Despite the name, this counts retries only: + # `min_attempts=2` permits the initial attempt PLUS at least 2 + # retries (3 total attempts) before the soft elapsed-time budget + # can cut the loop. Default `0` preserves prior behavior (budget + # honored from the first failed attempt). The field is named + # `min_attempts` for symmetry with the existing public-API + # vocabulary; the docstring is the contract. + # Exists to close the short-circuit where a single slow first + # attempt blows the soft budget and silently disables retries on + # subsequent transient errors. See FS-1988. + # + # `absolute_max_elapsed_time_ms`: cap on when a new retry can + # START. When set, no new retry attempt will be started past + # this elapsed time, regardless of `min_attempts`. In-flight + # `func()` calls are NOT interrupted — an attempt that begins + # just before the cap may run to its per-attempt timeout (e.g. + # the httpx client timeout) and exceed the cap. Worst-case + # wall-clock under this cap is therefore + # `absolute_max_elapsed_time_ms + per_attempt_timeout`, not + # `absolute_max_elapsed_time_ms`. To bound the true wall-clock, + # pair this with a per-attempt timeout no larger than the + # acceptable additional headroom past the cap. + # Default `None` preserves prior behavior (no cap on retry + # start; `min_attempts` and `max_elapsed_time` alone gate the + # loop). See FS-1988 PLAN.md "Worst-case wall-clock" section. + if min_attempts < 0: + raise ValueError( + f"min_attempts must be >= 0, got {min_attempts}" + ) + if absolute_max_elapsed_time_ms is not None: + if absolute_max_elapsed_time_ms <= 0: + raise ValueError( + "absolute_max_elapsed_time_ms must be > 0, " + f"got {absolute_max_elapsed_time_ms}" + ) + if absolute_max_elapsed_time_ms < max_elapsed_time: + raise ValueError( + "absolute_max_elapsed_time_ms must be >= max_elapsed_time " + f"({max_elapsed_time}), got {absolute_max_elapsed_time_ms}" + ) self.initial_interval = initial_interval self.max_interval = max_interval self.exponent = exponent self.max_elapsed_time = max_elapsed_time + self.min_attempts = min_attempts + self.absolute_max_elapsed_time_ms = absolute_max_elapsed_time_ms class RetryConfig: @@ -102,6 +150,8 @@ def do_request() -> httpx.Response: retries.config.backoff.max_interval, retries.config.backoff.exponent, retries.config.backoff.max_elapsed_time, + retries.config.backoff.min_attempts, + retries.config.backoff.absolute_max_elapsed_time_ms, ) return func() @@ -146,17 +196,55 @@ async def do_request() -> httpx.Response: retries.config.backoff.max_interval, retries.config.backoff.exponent, retries.config.backoff.max_elapsed_time, + retries.config.backoff.min_attempts, + retries.config.backoff.absolute_max_elapsed_time_ms, ) return await func() +def _cap_hit_after_attempt( + elapsed_ms, + retries, + min_attempts, + max_elapsed_time, + absolute_max_elapsed_time_ms, +): + """Return True if either the soft or hard cap is already exceeded. + + Called after an attempt fails, before deciding to sleep. Soft cap is + only honored once the `min_attempts` retry floor is satisfied; hard + cap (if set) is honored unconditionally. + """ + soft_cap_hit = retries >= min_attempts and elapsed_ms > max_elapsed_time + hard_cap_hit = ( + absolute_max_elapsed_time_ms is not None + and elapsed_ms > absolute_max_elapsed_time_ms + ) + return soft_cap_hit or hard_cap_hit + + +def _raise_or_return_after_cap(exception, elapsed_ms, retries, reason): + """Final handling when a cap is hit. Mirrors the original behavior + of returning the TemporaryError's response for retryable status codes + so the caller sees the last response rather than a stack trace.""" + if isinstance(exception, TemporaryError): + return exception.response + elapsed_seconds = elapsed_ms / 1000 + raise type(exception)( + f"{type(exception).__name__} after {retries + 1} attempts " + f"over {elapsed_seconds:.1f}s{reason}: {exception}" + ) from exception + + def retry_with_backoff( func, initial_interval=500, max_interval=60000, exponent=1.5, max_elapsed_time=3600000, + min_attempts=0, + absolute_max_elapsed_time_ms=None, ): start = round(time.time() * 1000) retries = 0 @@ -168,18 +256,61 @@ def retry_with_backoff( raise exception.inner except Exception as exception: # pylint: disable=broad-exception-caught now = round(time.time() * 1000) - if now - start > max_elapsed_time: - if isinstance(exception, TemporaryError): - return exception.response - - elapsed_seconds = (now - start) / 1000 - raise type(exception)( - f"{type(exception).__name__} after {retries + 1} attempts " - f"over {elapsed_seconds:.1f}s: {exception}" - ) from exception - sleep = (initial_interval / 1000) * exponent**retries + random.uniform(0, 1) - sleep = min(sleep, max_interval / 1000) - time.sleep(sleep) + elapsed = now - start + + # Post-attempt cap check: either cap exceeded by the attempt + # that just finished -> stop the loop. + if _cap_hit_after_attempt( + elapsed, + retries, + min_attempts, + max_elapsed_time, + absolute_max_elapsed_time_ms, + ): + result = _raise_or_return_after_cap( + exception, elapsed, retries, "" + ) + return result + + sleep_seconds = ( + (initial_interval / 1000) * exponent**retries + + random.uniform(0, 1) + ) + sleep_seconds = min(sleep_seconds, max_interval / 1000) + + # Pre-emptive hard-cap check: if the sleep alone would push us + # past the hard cap, no point sleeping into a doomed retry. + # Note: this prevents STARTING a new attempt past the cap, it + # does NOT interrupt an in-flight func() -- see docstring on + # BackoffStrategy.absolute_max_elapsed_time_ms. + if absolute_max_elapsed_time_ms is not None: + projected_elapsed = elapsed + int(sleep_seconds * 1000) + if projected_elapsed >= absolute_max_elapsed_time_ms: + result = _raise_or_return_after_cap( + exception, + elapsed, + retries, + " (hard cap would be exceeded during backoff)", + ) + return result + + time.sleep(sleep_seconds) + + # Post-sleep verification: belt-and-suspenders against late + # wakeups (OS scheduling) and rounding errors in the + # pre-sleep projection. If the actual elapsed time after the + # sleep is past the hard cap, do not start the next attempt. + if absolute_max_elapsed_time_ms is not None: + actual_elapsed = round(time.time() * 1000) - start + if actual_elapsed >= absolute_max_elapsed_time_ms: + result = _raise_or_return_after_cap( + exception, + actual_elapsed, + retries, + " (hard cap reached during backoff sleep)", + ) + return result + retries += 1 @@ -189,6 +320,8 @@ async def retry_with_backoff_async( max_interval=60000, exponent=1.5, max_elapsed_time=3600000, + min_attempts=0, + absolute_max_elapsed_time_ms=None, ): start = round(time.time() * 1000) retries = 0 @@ -200,16 +333,53 @@ async def retry_with_backoff_async( raise exception.inner except Exception as exception: # pylint: disable=broad-exception-caught now = round(time.time() * 1000) - if now - start > max_elapsed_time: - if isinstance(exception, TemporaryError): - return exception.response - - elapsed_seconds = (now - start) / 1000 - raise type(exception)( - f"{type(exception).__name__} after {retries + 1} attempts " - f"over {elapsed_seconds:.1f}s: {exception}" - ) from exception - sleep = (initial_interval / 1000) * exponent**retries + random.uniform(0, 1) - sleep = min(sleep, max_interval / 1000) - await asyncio.sleep(sleep) + elapsed = now - start + + if _cap_hit_after_attempt( + elapsed, + retries, + min_attempts, + max_elapsed_time, + absolute_max_elapsed_time_ms, + ): + result = _raise_or_return_after_cap( + exception, elapsed, retries, "" + ) + return result + + sleep_seconds = ( + (initial_interval / 1000) * exponent**retries + + random.uniform(0, 1) + ) + sleep_seconds = min(sleep_seconds, max_interval / 1000) + + if absolute_max_elapsed_time_ms is not None: + projected_elapsed = elapsed + int(sleep_seconds * 1000) + if projected_elapsed >= absolute_max_elapsed_time_ms: + result = _raise_or_return_after_cap( + exception, + elapsed, + retries, + " (hard cap would be exceeded during backoff)", + ) + return result + + await asyncio.sleep(sleep_seconds) + + # Post-sleep verification: belt-and-suspenders against late + # wakeups (event loop scheduling, asyncio.sleep imprecision) + # and rounding errors in the pre-sleep projection. If the + # actual elapsed time after the sleep is past the hard cap, + # do not start the next attempt. + if absolute_max_elapsed_time_ms is not None: + actual_elapsed = round(time.time() * 1000) - start + if actual_elapsed >= absolute_max_elapsed_time_ms: + result = _raise_or_return_after_cap( + exception, + actual_elapsed, + retries, + " (hard cap reached during backoff sleep)", + ) + return result + retries += 1 From d85deb1b1f4292584166ac6b928c0438df060fe5 Mon Sep 17 00:00:00 2001 From: aballman <35912177+aballman@users.noreply.github.com> Date: Tue, 26 May 2026 16:30:44 -0700 Subject: [PATCH 2/3] chore(FS-1988): trim verbose source comments --- .genignore | 13 +-- .../integration/test_decorators.py | 42 +------ .../unit/test_retries.py | 105 +----------------- src/unstructured_client/utils/retries.py | 55 +-------- 4 files changed, 10 insertions(+), 205 deletions(-) diff --git a/.genignore b/.genignore index cbce1a37..d53d4afc 100644 --- a/.genignore +++ b/.genignore @@ -20,15 +20,6 @@ src/unstructured_client/users.py # - Bring back the ignore line and commit src/unstructured_client/general.py -# Ignore retries.py so we can carry the `min_attempts` and -# `absolute_max_elapsed_time_ms` fields on BackoffStrategy plus the -# soft/hard cap logic in retry_with_backoff{,_async}. Filed in FS-1988. -# Speakeasy regen would otherwise wipe these fields. To pick up upstream -# fixes to the generated retry runtime: -# - Comment out this ignore line -# - Generate locally -# - Manually re-merge the FS-1988 additions -# - Re-add the ignore line and commit -# Long-term: push these fields upstream to Speakeasy templates so this -# ignore can be removed. +# Custom min_attempts / absolute_max_elapsed_time_ms fields on BackoffStrategy +# (FS-1988). Push upstream to Speakeasy templates to remove this entry. src/unstructured_client/utils/retries.py diff --git a/_test_unstructured_client/integration/test_decorators.py b/_test_unstructured_client/integration/test_decorators.py index 41aa5f67..0657b2a4 100644 --- a/_test_unstructured_client/integration/test_decorators.py +++ b/_test_unstructured_client/integration/test_decorators.py @@ -816,37 +816,15 @@ async def mock_send(_, request: httpx.Request, **kwargs): def test_split_pdf_cache_tmp_data_chunk_request_stream_is_replay_safe(tmp_path): - """Regression test for FS-1988: when split_pdf_cache_tmp_data=True - is set, chunks are loaded as opened file objects (not BytesIO) and - handed to `create_pdf_chunk_request`. The resulting `httpx.Request` - must be replay-safe — i.e. its body stream can be iterated more - than once, returning identical bytes each time. - - Body replay is what makes SDK-level retries on transient errors - (ReadTimeout, ConnectError, etc.) actually deliver the original - multipart payload to the server. A future Speakeasy template - change or refactor of `serialize_request_body` that produced a - single-consumption stream (e.g. an `Iterable[bytes]` over an open - file handle) would silently break retries: the server would see - an empty body on the second attempt. - - The invariant is pinned by iterating `request.stream` twice - directly — NOT via `request.read()`, which caches into - `Request._content` and would paper over a non-replayable stream - after the first call. This is the actual transport-level path - httpcore uses when `AsyncClient.send` retries a request. - """ from unstructured_client._hooks.custom.request_utils import ( create_pdf_chunk_request, ) - # Drive `create_pdf_chunk_request` directly with a file-object - # chunk — the cache_tmp_data=True branch in `_get_pdf_chunk_files`. chunk_path = tmp_path / "chunk.pdf" src_bytes = Path("_sample_docs/layout-parser-paper.pdf").read_bytes() chunk_path.write_bytes(src_bytes) - pdf_chunk_file = open(chunk_path, "rb") # noqa: SIM115 -- closed manually + pdf_chunk_file = open(chunk_path, "rb") # noqa: SIM115 try: form_data = { "files": (chunk_path.name, src_bytes, "application/pdf"), @@ -869,23 +847,11 @@ def test_split_pdf_cache_tmp_data_chunk_request_stream_is_replay_safe(tmp_path): filename=chunk_path.name, ) - # Iterate the body stream twice. If the underlying stream is - # not replayable (e.g. an open file handle that's exhausted - # after the first pass), the second iteration yields empty - # or partial bytes and the assert below fails. + # Iterate twice without request.read() to bypass _content caching. first_pass = b"".join(chunk_request.stream) second_pass = b"".join(chunk_request.stream) - assert len(first_pass) > 1000, ( - f"First iteration produced too-small body ({len(first_pass)} " - "bytes); a real PDF chunk multipart envelope should be at " - "least kilobytes." - ) - assert first_pass == second_pass, ( - "Body stream not replay-safe: second iteration of " - "chunk_request.stream returned different bytes than the " - "first. SDK retries on transient errors would silently " - "send a truncated or empty body to the server." - ) + assert len(first_pass) > 1000 + assert first_pass == second_pass finally: pdf_chunk_file.close() diff --git a/_test_unstructured_client/unit/test_retries.py b/_test_unstructured_client/unit/test_retries.py index 9fc58c53..2a1c89f5 100644 --- a/_test_unstructured_client/unit/test_retries.py +++ b/_test_unstructured_client/unit/test_retries.py @@ -1,6 +1,3 @@ -"""Tests for retry logic covering all TransportError subclasses -and the FS-1988 min_attempts / absolute_max_elapsed_time_ms semantics.""" - import asyncio import time as time_module from unittest.mock import MagicMock @@ -20,29 +17,7 @@ ) -# ----------------------------------------------------------------- -# FS-1988 fake-clock harness -# -# `retries.py` reads `time.time()` and calls `time.sleep()` / -# `asyncio.sleep()`. The retries module imports `time` and `asyncio` -# at module scope, so we patch those module attributes. Each test -# starts the clock at 0ms and advances it explicitly: -# - `time.sleep(s)` advances the fake clock by s*1000 ms -# - `asyncio.sleep(s)` advances the fake clock by s*1000 ms -# - the test `func` callable advances the clock by its own simulated -# attempt duration before raising / returning -# ----------------------------------------------------------------- - - class FakeClock: - """Mutable monotonic clock for retry loop tests. - - Tracks elapsed milliseconds since fixture creation. `now_ms` is - incremented by patched sleep functions and by test func() bodies. - `time()` returns a float seconds value compatible with - `round(time.time() * 1000)` patterns in retries.py. - """ - def __init__(self): self.now_ms = 0 @@ -69,10 +44,7 @@ async def fake_async_sleep(seconds): monkeypatch.setattr(retries_module.time, "time", fake_time) monkeypatch.setattr(retries_module.time, "sleep", fake_sync_sleep) monkeypatch.setattr(retries_module.asyncio, "sleep", fake_async_sleep) - # Strip jitter so backoff durations are exactly reproducible. - monkeypatch.setattr( - retries_module.random, "uniform", lambda a, b: 0.0 - ) + monkeypatch.setattr(retries_module.random, "uniform", lambda a, b: 0.0) return clock @@ -126,7 +98,6 @@ def _make_retries(retry_connection_errors: bool) -> Retries: ) -# All TransportError subclasses that should be retried TRANSPORT_ERRORS = [ (httpx.ConnectError, "Connection refused"), (httpx.RemoteProtocolError, "Server disconnected without sending a response."), @@ -138,7 +109,6 @@ def _make_retries(retry_connection_errors: bool) -> Retries: class TestTransportErrorRetry: - """All httpx.TransportError subclasses should be retried when retry_connection_errors=True.""" @pytest.mark.parametrize("exc_class,msg", TRANSPORT_ERRORS) def test_transport_error_retried_when_enabled(self, exc_class, msg): @@ -172,7 +142,6 @@ def func(): class TestTransportErrorRetryAsync: - """Async: All httpx.TransportError subclasses should be retried.""" @pytest.mark.parametrize("exc_class,msg", TRANSPORT_ERRORS) def test_transport_error_retried_async(self, exc_class, msg): @@ -205,12 +174,6 @@ async def func(): asyncio.run(retry_async(func, retries_config)) -# ----------------------------------------------------------------- -# FS-1988 tests: BackoffStrategy validation (T12-T14) -# These do not need the fake-clock harness. -# ----------------------------------------------------------------- - - class TestBackoffStrategyValidation: def test_t12_negative_min_attempts_rejected(self): with pytest.raises(ValueError, match="min_attempts must be >= 0"): @@ -264,11 +227,6 @@ def test_absolute_max_equal_to_soft_max_accepted(self): ) -# ----------------------------------------------------------------- -# FS-1988 tests: T1, T2 -- backward compatibility (min_attempts=0) -# ----------------------------------------------------------------- - - class TestBackwardCompat: def test_t1_no_failures_sync(self, fake_clock): retries_config = _retries() @@ -327,16 +285,8 @@ async def func(): assert call_count == 2 -# ----------------------------------------------------------------- -# FS-1988 tests: T3 -- the v1 reproducer. -# Attempt 1 takes longer than the soft cap; the min_attempts floor -# must permit attempt 2 to fire, which then succeeds. -# ----------------------------------------------------------------- - - class TestSoftCapFloor: def test_t3_slow_first_attempt_then_success_sync(self, fake_clock): - # Soft cap 5s, attempt 1 takes 6s, min_attempts=2. retries_config = _retries( max_elapsed_time=5_000, min_attempts=2, @@ -375,9 +325,6 @@ async def func(): assert call_count == 2 def test_t3_baseline_pre_fix_behavior(self, fake_clock): - """Without the min_attempts floor (min_attempts=0), the same - scenario short-circuits with no retries -- this is the pre-fix - regression. Documents the bug class we are closing.""" retries_config = _retries( max_elapsed_time=5_000, min_attempts=0, @@ -399,15 +346,8 @@ def func(): assert call_count == 1 -# ----------------------------------------------------------------- -# FS-1988 tests: T4, T5 -- floor is NOT a ceiling. -# ----------------------------------------------------------------- - - class TestFloorIsNotCeiling: def test_t4_floor_does_not_cap_attempts_sync(self, fake_clock): - """min_attempts=2 with 5 transient failures + budget should - produce 6 total attempts, NOT 3.""" retries_config = _retries( initial_interval=100, max_interval=200, @@ -429,9 +369,6 @@ def func(): assert call_count == 6 def test_t5_floor_then_soft_cap_sync(self, fake_clock): - """With min_attempts=2 and all attempts failing fast, the - loop must fire AT LEAST 3 total attempts (initial + 2 retries) - before the soft cap can cut in.""" retries_config = _retries( initial_interval=100, max_interval=200, @@ -444,26 +381,16 @@ def test_t5_floor_then_soft_cap_sync(self, fake_clock): def func(): nonlocal call_count call_count += 1 - # Each attempt itself is instantaneous; only the backoff # sleep advances the fake clock. raise httpx.ConnectError("transient") with pytest.raises(httpx.ConnectError): retry(func, retries_config) - # 3 = initial + 2 retries (the min_attempts floor) assert call_count >= 3 -# ----------------------------------------------------------------- -# FS-1988 tests: T6, T7 -- hard cap overrides the floor; sleep -# truncation prevents sleeping into a doomed retry. -# ----------------------------------------------------------------- - - class TestHardCap: def test_t6_hard_cap_overrides_floor_sync(self, fake_clock): - """min_attempts=2 but attempt 1 takes longer than the hard - cap. Hard cap must fire even though the floor is unsatisfied.""" retries_config = _retries( max_elapsed_time=5_000, min_attempts=2, @@ -481,7 +408,6 @@ def func(): with pytest.raises(httpx.ReadTimeout): retry(func, retries_config) - # Hard cap fires after attempt 1; no further attempts. assert call_count == 1 def test_t6_hard_cap_overrides_floor_async(self, fake_clock): @@ -505,10 +431,6 @@ async def func(): assert call_count == 1 def test_t7_sleep_truncation_prevents_doomed_retry_sync(self, fake_clock): - """Hard cap pre-emptive check: if the sleep alone would push - past the cap, raise immediately rather than sleeping into a - retry that can never start.""" - # Soft cap 5s, hard cap 10s, attempt 1 takes 9.95s. # Sleep starts at 100ms * 1.5^0 = 100ms (no jitter in tests), # but we want to assert the sleep+elapsed > hard_cap path # fires. Use larger initial_interval to make the calculation @@ -538,16 +460,8 @@ def func(): assert call_count == 1 -# ----------------------------------------------------------------- -# FS-1988 tests: T8, T9, T10 -- TemporaryError (retryable 5xx) -# early-return behavior under soft and hard cap exhaustion. -# ----------------------------------------------------------------- - - class TestTemporaryErrorEarlyReturn: def test_t8_soft_cap_5xx_returns_last_response_sync(self, fake_clock): - """min_attempts=0, retryable 5xx, soft cap exhausted: returns - the last 5xx response (existing behavior).""" retries_config = _retries( initial_interval=100, max_interval=200, @@ -562,11 +476,9 @@ def test_t8_soft_cap_5xx_returns_last_response_sync(self, fake_clock): def func(): nonlocal call_count call_count += 1 - # Each attempt itself is instant; backoff sleep accumulates. return bad_response result = retry(func, retries_config) - # Should return the last 5xx response, not raise. assert result.status_code == 503 assert call_count >= 1 @@ -589,12 +501,9 @@ def func(): result = retry(func, retries_config) assert result.status_code == 503 - # Floor enforced: at least initial + 2 retries before soft cap. assert call_count >= 3 def test_t10_hard_cap_5xx_returns_last_response_sync(self, fake_clock): - """Hard cap fires; TemporaryError still returns the response, - not a raised exception.""" retries_config = _retries( initial_interval=100, max_interval=200, @@ -619,18 +528,7 @@ def func(): assert call_count == 1 -# ----------------------------------------------------------------- -# FS-1988 tests: T11 -- PermanentError unaffected by min_attempts. -# ----------------------------------------------------------------- - - class TestPermanentErrorShortCircuit: - """A non-transport, non-status-code exception raised from func() is - wrapped by retry/retry_async's inner do_request closure in a - PermanentError. The outer retry_with_backoff{,_async} catches - PermanentError and re-raises `.inner`, which short-circuits the loop - regardless of min_attempts. Verifies the floor does NOT force retries - on permanent failures.""" def test_t11_permanent_error_one_attempt_sync(self, fake_clock): retries_config = _retries(min_attempts=10) @@ -643,7 +541,6 @@ def func(): with pytest.raises(ValueError, match="permanent"): retry(func, retries_config) - # Even with min_attempts=10, PermanentError short-circuits. assert call_count == 1 def test_t11_permanent_error_one_attempt_async(self, fake_clock): diff --git a/src/unstructured_client/utils/retries.py b/src/unstructured_client/utils/retries.py index 4a109479..9e06c648 100644 --- a/src/unstructured_client/utils/retries.py +++ b/src/unstructured_client/utils/retries.py @@ -25,33 +25,9 @@ def __init__( min_attempts: int = 0, absolute_max_elapsed_time_ms: int | None = None, ): - # `min_attempts`: minimum number of RETRY attempts (NOT counting - # the initial attempt) that must fire before `max_elapsed_time` - # is honored. Despite the name, this counts retries only: - # `min_attempts=2` permits the initial attempt PLUS at least 2 - # retries (3 total attempts) before the soft elapsed-time budget - # can cut the loop. Default `0` preserves prior behavior (budget - # honored from the first failed attempt). The field is named - # `min_attempts` for symmetry with the existing public-API - # vocabulary; the docstring is the contract. - # Exists to close the short-circuit where a single slow first - # attempt blows the soft budget and silently disables retries on - # subsequent transient errors. See FS-1988. - # - # `absolute_max_elapsed_time_ms`: cap on when a new retry can - # START. When set, no new retry attempt will be started past - # this elapsed time, regardless of `min_attempts`. In-flight - # `func()` calls are NOT interrupted — an attempt that begins - # just before the cap may run to its per-attempt timeout (e.g. - # the httpx client timeout) and exceed the cap. Worst-case - # wall-clock under this cap is therefore - # `absolute_max_elapsed_time_ms + per_attempt_timeout`, not - # `absolute_max_elapsed_time_ms`. To bound the true wall-clock, - # pair this with a per-attempt timeout no larger than the - # acceptable additional headroom past the cap. - # Default `None` preserves prior behavior (no cap on retry - # start; `min_attempts` and `max_elapsed_time` alone gate the - # loop). See FS-1988 PLAN.md "Worst-case wall-clock" section. + # min_attempts counts retries (not the initial attempt). + # absolute_max_elapsed_time_ms caps when a new retry can start; + # it does not interrupt in-flight func() calls. if min_attempts < 0: raise ValueError( f"min_attempts must be >= 0, got {min_attempts}" @@ -210,12 +186,6 @@ def _cap_hit_after_attempt( max_elapsed_time, absolute_max_elapsed_time_ms, ): - """Return True if either the soft or hard cap is already exceeded. - - Called after an attempt fails, before deciding to sleep. Soft cap is - only honored once the `min_attempts` retry floor is satisfied; hard - cap (if set) is honored unconditionally. - """ soft_cap_hit = retries >= min_attempts and elapsed_ms > max_elapsed_time hard_cap_hit = ( absolute_max_elapsed_time_ms is not None @@ -225,9 +195,6 @@ def _cap_hit_after_attempt( def _raise_or_return_after_cap(exception, elapsed_ms, retries, reason): - """Final handling when a cap is hit. Mirrors the original behavior - of returning the TemporaryError's response for retryable status codes - so the caller sees the last response rather than a stack trace.""" if isinstance(exception, TemporaryError): return exception.response elapsed_seconds = elapsed_ms / 1000 @@ -258,8 +225,6 @@ def retry_with_backoff( now = round(time.time() * 1000) elapsed = now - start - # Post-attempt cap check: either cap exceeded by the attempt - # that just finished -> stop the loop. if _cap_hit_after_attempt( elapsed, retries, @@ -278,11 +243,6 @@ def retry_with_backoff( ) sleep_seconds = min(sleep_seconds, max_interval / 1000) - # Pre-emptive hard-cap check: if the sleep alone would push us - # past the hard cap, no point sleeping into a doomed retry. - # Note: this prevents STARTING a new attempt past the cap, it - # does NOT interrupt an in-flight func() -- see docstring on - # BackoffStrategy.absolute_max_elapsed_time_ms. if absolute_max_elapsed_time_ms is not None: projected_elapsed = elapsed + int(sleep_seconds * 1000) if projected_elapsed >= absolute_max_elapsed_time_ms: @@ -296,10 +256,6 @@ def retry_with_backoff( time.sleep(sleep_seconds) - # Post-sleep verification: belt-and-suspenders against late - # wakeups (OS scheduling) and rounding errors in the - # pre-sleep projection. If the actual elapsed time after the - # sleep is past the hard cap, do not start the next attempt. if absolute_max_elapsed_time_ms is not None: actual_elapsed = round(time.time() * 1000) - start if actual_elapsed >= absolute_max_elapsed_time_ms: @@ -366,11 +322,6 @@ async def retry_with_backoff_async( await asyncio.sleep(sleep_seconds) - # Post-sleep verification: belt-and-suspenders against late - # wakeups (event loop scheduling, asyncio.sleep imprecision) - # and rounding errors in the pre-sleep projection. If the - # actual elapsed time after the sleep is past the hard cap, - # do not start the next attempt. if absolute_max_elapsed_time_ms is not None: actual_elapsed = round(time.time() * 1000) - start if actual_elapsed >= absolute_max_elapsed_time_ms: From 97e28be7b6e37178fe25196f742691620b2c2cbd Mon Sep 17 00:00:00 2001 From: aballman <35912177+aballman@users.noreply.github.com> Date: Tue, 26 May 2026 17:19:31 -0700 Subject: [PATCH 3/3] chore: drop ticket ref from .genignore comment --- .genignore | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.genignore b/.genignore index d53d4afc..ea1fba41 100644 --- a/.genignore +++ b/.genignore @@ -20,6 +20,6 @@ src/unstructured_client/users.py # - Bring back the ignore line and commit src/unstructured_client/general.py -# Custom min_attempts / absolute_max_elapsed_time_ms fields on BackoffStrategy -# (FS-1988). Push upstream to Speakeasy templates to remove this entry. +# Custom min_attempts / absolute_max_elapsed_time_ms fields on BackoffStrategy. +# Push upstream to Speakeasy templates to remove this entry. src/unstructured_client/utils/retries.py