diff --git a/.genignore b/.genignore index 8e5fcfd4..ea1fba41 100644 --- a/.genignore +++ b/.genignore @@ -19,3 +19,7 @@ src/unstructured_client/users.py # - Adjust the custom url snippets in the file # - Bring back the ignore line and commit src/unstructured_client/general.py + +# Custom min_attempts / absolute_max_elapsed_time_ms fields on BackoffStrategy. +# Push upstream to Speakeasy templates to remove this entry. +src/unstructured_client/utils/retries.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c11d350..ac17c421 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ ### Breaking changes * Removed deprecated connector config models from the SDK (e.g. `S3SourceConnectorConfig`, `AzureDestinationConnectorConfig`). Pass connector configs as plain dicts with arbitrary fields. The SDK is no longer coupled to backend connector schemas — new fields work without an SDK upgrade. +### Features +* Add `min_attempts` and `absolute_max_elapsed_time_ms` fields to `BackoffStrategy`. `min_attempts` is the minimum number of retry attempts that must fire before `max_elapsed_time` is honored; defaults to `0` (preserves existing behavior). `absolute_max_elapsed_time_ms` caps when a new retry can start (does not interrupt in-flight requests); defaults to `None`. Together these close a short-circuit where a single slow first attempt could exhaust the retry budget before any retry fired. See FS-1988. + ## 0.43.4 ### Enhancements diff --git a/_test_unstructured_client/integration/test_decorators.py b/_test_unstructured_client/integration/test_decorators.py index 1fd69bfb..0657b2a4 100644 --- a/_test_unstructured_client/integration/test_decorators.py +++ b/_test_unstructured_client/integration/test_decorators.py @@ -813,3 +813,45 @@ async def mock_send(_, request: httpx.Request, **kwargs): assert number_of_transport_failures == 0 assert mock_endpoint_called assert res.status_code == 200 + + +def test_split_pdf_cache_tmp_data_chunk_request_stream_is_replay_safe(tmp_path): + from unstructured_client._hooks.custom.request_utils import ( + create_pdf_chunk_request, + ) + + chunk_path = tmp_path / "chunk.pdf" + src_bytes = Path("_sample_docs/layout-parser-paper.pdf").read_bytes() + chunk_path.write_bytes(src_bytes) + + pdf_chunk_file = open(chunk_path, "rb") # noqa: SIM115 + try: + form_data = { + "files": (chunk_path.name, src_bytes, "application/pdf"), + "strategy": "fast", + } + original_request = httpx.Request( + method="POST", + url="http://localhost:8000/general/v0/general", + headers={ + "Content-Type": "multipart/form-data; boundary=test", + "User-Agent": "test", + }, + content=b"", + ) + + chunk_request = create_pdf_chunk_request( + form_data=form_data, + pdf_chunk=(pdf_chunk_file, 1), + original_request=original_request, + filename=chunk_path.name, + ) + + # Iterate twice without request.read() to bypass _content caching. + first_pass = b"".join(chunk_request.stream) + second_pass = b"".join(chunk_request.stream) + + assert len(first_pass) > 1000 + assert first_pass == second_pass + finally: + pdf_chunk_file.close() diff --git a/_test_unstructured_client/unit/test_retries.py b/_test_unstructured_client/unit/test_retries.py index fffbba8e..2a1c89f5 100644 --- a/_test_unstructured_client/unit/test_retries.py +++ b/_test_unstructured_client/unit/test_retries.py @@ -1,21 +1,87 @@ -"""Tests for retry logic covering all TransportError subclasses.""" - import asyncio +import time as time_module from unittest.mock import MagicMock import httpx import pytest +from unstructured_client.utils import retries as retries_module from unstructured_client.utils.retries import ( BackoffStrategy, PermanentError, Retries, RetryConfig, + TemporaryError, retry, retry_async, ) +class FakeClock: + def __init__(self): + self.now_ms = 0 + + def time(self) -> float: + return self.now_ms / 1000.0 + + def advance(self, milliseconds: int) -> None: + self.now_ms += int(milliseconds) + + +@pytest.fixture +def fake_clock(monkeypatch): + clock = FakeClock() + + def fake_time(): + return clock.time() + + def fake_sync_sleep(seconds): + clock.advance(int(seconds * 1000)) + + async def fake_async_sleep(seconds): + clock.advance(int(seconds * 1000)) + + monkeypatch.setattr(retries_module.time, "time", fake_time) + monkeypatch.setattr(retries_module.time, "sleep", fake_sync_sleep) + monkeypatch.setattr(retries_module.asyncio, "sleep", fake_async_sleep) + monkeypatch.setattr(retries_module.random, "uniform", lambda a, b: 0.0) + return clock + + +def _retries( + *, + initial_interval: int = 100, + max_interval: int = 200, + exponent: float = 1.5, + max_elapsed_time: int = 5_000, + min_attempts: int = 0, + absolute_max_elapsed_time_ms=None, + retry_connection_errors: bool = True, + status_codes=None, +) -> Retries: + return Retries( + config=RetryConfig( + strategy="backoff", + backoff=BackoffStrategy( + initial_interval=initial_interval, + max_interval=max_interval, + exponent=exponent, + max_elapsed_time=max_elapsed_time, + min_attempts=min_attempts, + absolute_max_elapsed_time_ms=absolute_max_elapsed_time_ms, + ), + retry_connection_errors=retry_connection_errors, + ), + status_codes=status_codes or [], + ) + + +def _make_ok_response(status_code: int = 200) -> httpx.Response: + resp = MagicMock(spec=httpx.Response) + resp.status_code = status_code + return resp + + def _make_retries(retry_connection_errors: bool) -> Retries: return Retries( config=RetryConfig( @@ -32,7 +98,6 @@ def _make_retries(retry_connection_errors: bool) -> Retries: ) -# All TransportError subclasses that should be retried TRANSPORT_ERRORS = [ (httpx.ConnectError, "Connection refused"), (httpx.RemoteProtocolError, "Server disconnected without sending a response."), @@ -44,7 +109,6 @@ def _make_retries(retry_connection_errors: bool) -> Retries: class TestTransportErrorRetry: - """All httpx.TransportError subclasses should be retried when retry_connection_errors=True.""" @pytest.mark.parametrize("exc_class,msg", TRANSPORT_ERRORS) def test_transport_error_retried_when_enabled(self, exc_class, msg): @@ -78,7 +142,6 @@ def func(): class TestTransportErrorRetryAsync: - """Async: All httpx.TransportError subclasses should be retried.""" @pytest.mark.parametrize("exc_class,msg", TRANSPORT_ERRORS) def test_transport_error_retried_async(self, exc_class, msg): @@ -109,3 +172,386 @@ async def func(): with pytest.raises(exc_class): asyncio.run(retry_async(func, retries_config)) + + +class TestBackoffStrategyValidation: + def test_t12_negative_min_attempts_rejected(self): + with pytest.raises(ValueError, match="min_attempts must be >= 0"): + BackoffStrategy( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=5_000, + min_attempts=-1, + ) + + def test_t13_zero_absolute_max_rejected(self): + with pytest.raises(ValueError, match="absolute_max_elapsed_time_ms must be > 0"): + BackoffStrategy( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=5_000, + absolute_max_elapsed_time_ms=0, + ) + + def test_t14_absolute_max_below_soft_max_rejected(self): + with pytest.raises( + ValueError, + match="absolute_max_elapsed_time_ms must be >= max_elapsed_time", + ): + BackoffStrategy( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=10_000, + absolute_max_elapsed_time_ms=5_000, + ) + + def test_min_attempts_zero_accepted(self): + BackoffStrategy( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=5_000, + min_attempts=0, + ) + + def test_absolute_max_equal_to_soft_max_accepted(self): + BackoffStrategy( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=5_000, + absolute_max_elapsed_time_ms=5_000, + ) + + +class TestBackwardCompat: + def test_t1_no_failures_sync(self, fake_clock): + retries_config = _retries() + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + return _make_ok_response() + + result = retry(func, retries_config) + assert result.status_code == 200 + assert call_count == 1 + + def test_t1_no_failures_async(self, fake_clock): + retries_config = _retries() + call_count = 0 + + async def func(): + nonlocal call_count + call_count += 1 + return _make_ok_response() + + result = asyncio.run(retry_async(func, retries_config)) + assert result.status_code == 200 + assert call_count == 1 + + def test_t2_one_transient_then_success_sync(self, fake_clock): + retries_config = _retries() + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise httpx.ConnectError("transient") + return _make_ok_response() + + result = retry(func, retries_config) + assert result.status_code == 200 + assert call_count == 2 + + def test_t2_one_transient_then_success_async(self, fake_clock): + retries_config = _retries() + call_count = 0 + + async def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise httpx.ConnectError("transient") + return _make_ok_response() + + result = asyncio.run(retry_async(func, retries_config)) + assert result.status_code == 200 + assert call_count == 2 + + +class TestSoftCapFloor: + def test_t3_slow_first_attempt_then_success_sync(self, fake_clock): + retries_config = _retries( + max_elapsed_time=5_000, + min_attempts=2, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(6_000) # 6s + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + result = retry(func, retries_config) + assert result.status_code == 200 + assert call_count == 2 + + def test_t3_slow_first_attempt_then_success_async(self, fake_clock): + retries_config = _retries( + max_elapsed_time=5_000, + min_attempts=2, + ) + call_count = 0 + + async def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(6_000) + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + result = asyncio.run(retry_async(func, retries_config)) + assert result.status_code == 200 + assert call_count == 2 + + def test_t3_baseline_pre_fix_behavior(self, fake_clock): + retries_config = _retries( + max_elapsed_time=5_000, + min_attempts=0, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(6_000) + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + # Without the floor, soft cap fires on attempt 1's failure and + # the loop raises the wrapped ReadTimeout. + with pytest.raises(httpx.ReadTimeout): + retry(func, retries_config) + assert call_count == 1 + + +class TestFloorIsNotCeiling: + def test_t4_floor_does_not_cap_attempts_sync(self, fake_clock): + retries_config = _retries( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=60_000, # generous budget + min_attempts=2, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count <= 5: + raise httpx.ConnectError("transient") + return _make_ok_response() + + result = retry(func, retries_config) + assert result.status_code == 200 + assert call_count == 6 + + def test_t5_floor_then_soft_cap_sync(self, fake_clock): + retries_config = _retries( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=10_000, + min_attempts=2, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + # sleep advances the fake clock. + raise httpx.ConnectError("transient") + + with pytest.raises(httpx.ConnectError): + retry(func, retries_config) + assert call_count >= 3 + + +class TestHardCap: + def test_t6_hard_cap_overrides_floor_sync(self, fake_clock): + retries_config = _retries( + max_elapsed_time=5_000, + min_attempts=2, + absolute_max_elapsed_time_ms=10_000, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(11_000) # 11s > 10s hard cap + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + with pytest.raises(httpx.ReadTimeout): + retry(func, retries_config) + assert call_count == 1 + + def test_t6_hard_cap_overrides_floor_async(self, fake_clock): + retries_config = _retries( + max_elapsed_time=5_000, + min_attempts=2, + absolute_max_elapsed_time_ms=10_000, + ) + call_count = 0 + + async def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(11_000) + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + with pytest.raises(httpx.ReadTimeout): + asyncio.run(retry_async(func, retries_config)) + assert call_count == 1 + + def test_t7_sleep_truncation_prevents_doomed_retry_sync(self, fake_clock): + # Sleep starts at 100ms * 1.5^0 = 100ms (no jitter in tests), + # but we want to assert the sleep+elapsed > hard_cap path + # fires. Use larger initial_interval to make the calculation + # land deterministically. + retries_config = _retries( + initial_interval=200, # 200ms initial + max_interval=1_000, + exponent=2.0, + max_elapsed_time=5_000, + min_attempts=2, # floor is unsatisfied; only hard cap can cut + absolute_max_elapsed_time_ms=10_000, + ) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + # attempt 1 lasts 9.95s -> elapsed = 9.95s, sleep would be + # 200ms -> projected = 10.15s >= hard cap 10s -> raise. + fake_clock.advance(9_950) + raise httpx.ReadTimeout("slow") + return _make_ok_response() + + with pytest.raises(httpx.ReadTimeout): + retry(func, retries_config) + assert call_count == 1 + + +class TestTemporaryErrorEarlyReturn: + def test_t8_soft_cap_5xx_returns_last_response_sync(self, fake_clock): + retries_config = _retries( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=500, + min_attempts=0, + status_codes=["5xx"], + ) + bad_response = _make_ok_response(status_code=503) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + return bad_response + + result = retry(func, retries_config) + assert result.status_code == 503 + assert call_count >= 1 + + def test_t9_floor_then_soft_cap_5xx_returns_last_response_sync(self, fake_clock): + retries_config = _retries( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=1_000, + min_attempts=2, + status_codes=["5xx"], + ) + bad_response = _make_ok_response(status_code=503) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + return bad_response + + result = retry(func, retries_config) + assert result.status_code == 503 + assert call_count >= 3 + + def test_t10_hard_cap_5xx_returns_last_response_sync(self, fake_clock): + retries_config = _retries( + initial_interval=100, + max_interval=200, + exponent=1.5, + max_elapsed_time=5_000, + min_attempts=2, + absolute_max_elapsed_time_ms=10_000, + status_codes=["5xx"], + ) + bad_response = _make_ok_response(status_code=503) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + fake_clock.advance(11_000) # past hard cap + return bad_response + + result = retry(func, retries_config) + assert result.status_code == 503 + assert call_count == 1 + + +class TestPermanentErrorShortCircuit: + + def test_t11_permanent_error_one_attempt_sync(self, fake_clock): + retries_config = _retries(min_attempts=10) + call_count = 0 + + def func(): + nonlocal call_count + call_count += 1 + raise ValueError("permanent") + + with pytest.raises(ValueError, match="permanent"): + retry(func, retries_config) + assert call_count == 1 + + def test_t11_permanent_error_one_attempt_async(self, fake_clock): + retries_config = _retries(min_attempts=10) + call_count = 0 + + async def func(): + nonlocal call_count + call_count += 1 + raise ValueError("permanent") + + with pytest.raises(ValueError, match="permanent"): + asyncio.run(retry_async(func, retries_config)) + assert call_count == 1 diff --git a/src/unstructured_client/utils/retries.py b/src/unstructured_client/utils/retries.py index 4b903b58..9e06c648 100644 --- a/src/unstructured_client/utils/retries.py +++ b/src/unstructured_client/utils/retries.py @@ -13,6 +13,8 @@ class BackoffStrategy: max_interval: int exponent: float max_elapsed_time: int + min_attempts: int + absolute_max_elapsed_time_ms: int | None def __init__( self, @@ -20,11 +22,33 @@ def __init__( max_interval: int, exponent: float, max_elapsed_time: int, + min_attempts: int = 0, + absolute_max_elapsed_time_ms: int | None = None, ): + # min_attempts counts retries (not the initial attempt). + # absolute_max_elapsed_time_ms caps when a new retry can start; + # it does not interrupt in-flight func() calls. + if min_attempts < 0: + raise ValueError( + f"min_attempts must be >= 0, got {min_attempts}" + ) + if absolute_max_elapsed_time_ms is not None: + if absolute_max_elapsed_time_ms <= 0: + raise ValueError( + "absolute_max_elapsed_time_ms must be > 0, " + f"got {absolute_max_elapsed_time_ms}" + ) + if absolute_max_elapsed_time_ms < max_elapsed_time: + raise ValueError( + "absolute_max_elapsed_time_ms must be >= max_elapsed_time " + f"({max_elapsed_time}), got {absolute_max_elapsed_time_ms}" + ) self.initial_interval = initial_interval self.max_interval = max_interval self.exponent = exponent self.max_elapsed_time = max_elapsed_time + self.min_attempts = min_attempts + self.absolute_max_elapsed_time_ms = absolute_max_elapsed_time_ms class RetryConfig: @@ -102,6 +126,8 @@ def do_request() -> httpx.Response: retries.config.backoff.max_interval, retries.config.backoff.exponent, retries.config.backoff.max_elapsed_time, + retries.config.backoff.min_attempts, + retries.config.backoff.absolute_max_elapsed_time_ms, ) return func() @@ -146,17 +172,46 @@ async def do_request() -> httpx.Response: retries.config.backoff.max_interval, retries.config.backoff.exponent, retries.config.backoff.max_elapsed_time, + retries.config.backoff.min_attempts, + retries.config.backoff.absolute_max_elapsed_time_ms, ) return await func() +def _cap_hit_after_attempt( + elapsed_ms, + retries, + min_attempts, + max_elapsed_time, + absolute_max_elapsed_time_ms, +): + soft_cap_hit = retries >= min_attempts and elapsed_ms > max_elapsed_time + hard_cap_hit = ( + absolute_max_elapsed_time_ms is not None + and elapsed_ms > absolute_max_elapsed_time_ms + ) + return soft_cap_hit or hard_cap_hit + + +def _raise_or_return_after_cap(exception, elapsed_ms, retries, reason): + if isinstance(exception, TemporaryError): + return exception.response + elapsed_seconds = elapsed_ms / 1000 + raise type(exception)( + f"{type(exception).__name__} after {retries + 1} attempts " + f"over {elapsed_seconds:.1f}s{reason}: {exception}" + ) from exception + + def retry_with_backoff( func, initial_interval=500, max_interval=60000, exponent=1.5, max_elapsed_time=3600000, + min_attempts=0, + absolute_max_elapsed_time_ms=None, ): start = round(time.time() * 1000) retries = 0 @@ -168,18 +223,50 @@ def retry_with_backoff( raise exception.inner except Exception as exception: # pylint: disable=broad-exception-caught now = round(time.time() * 1000) - if now - start > max_elapsed_time: - if isinstance(exception, TemporaryError): - return exception.response - - elapsed_seconds = (now - start) / 1000 - raise type(exception)( - f"{type(exception).__name__} after {retries + 1} attempts " - f"over {elapsed_seconds:.1f}s: {exception}" - ) from exception - sleep = (initial_interval / 1000) * exponent**retries + random.uniform(0, 1) - sleep = min(sleep, max_interval / 1000) - time.sleep(sleep) + elapsed = now - start + + if _cap_hit_after_attempt( + elapsed, + retries, + min_attempts, + max_elapsed_time, + absolute_max_elapsed_time_ms, + ): + result = _raise_or_return_after_cap( + exception, elapsed, retries, "" + ) + return result + + sleep_seconds = ( + (initial_interval / 1000) * exponent**retries + + random.uniform(0, 1) + ) + sleep_seconds = min(sleep_seconds, max_interval / 1000) + + if absolute_max_elapsed_time_ms is not None: + projected_elapsed = elapsed + int(sleep_seconds * 1000) + if projected_elapsed >= absolute_max_elapsed_time_ms: + result = _raise_or_return_after_cap( + exception, + elapsed, + retries, + " (hard cap would be exceeded during backoff)", + ) + return result + + time.sleep(sleep_seconds) + + if absolute_max_elapsed_time_ms is not None: + actual_elapsed = round(time.time() * 1000) - start + if actual_elapsed >= absolute_max_elapsed_time_ms: + result = _raise_or_return_after_cap( + exception, + actual_elapsed, + retries, + " (hard cap reached during backoff sleep)", + ) + return result + retries += 1 @@ -189,6 +276,8 @@ async def retry_with_backoff_async( max_interval=60000, exponent=1.5, max_elapsed_time=3600000, + min_attempts=0, + absolute_max_elapsed_time_ms=None, ): start = round(time.time() * 1000) retries = 0 @@ -200,16 +289,48 @@ async def retry_with_backoff_async( raise exception.inner except Exception as exception: # pylint: disable=broad-exception-caught now = round(time.time() * 1000) - if now - start > max_elapsed_time: - if isinstance(exception, TemporaryError): - return exception.response - - elapsed_seconds = (now - start) / 1000 - raise type(exception)( - f"{type(exception).__name__} after {retries + 1} attempts " - f"over {elapsed_seconds:.1f}s: {exception}" - ) from exception - sleep = (initial_interval / 1000) * exponent**retries + random.uniform(0, 1) - sleep = min(sleep, max_interval / 1000) - await asyncio.sleep(sleep) + elapsed = now - start + + if _cap_hit_after_attempt( + elapsed, + retries, + min_attempts, + max_elapsed_time, + absolute_max_elapsed_time_ms, + ): + result = _raise_or_return_after_cap( + exception, elapsed, retries, "" + ) + return result + + sleep_seconds = ( + (initial_interval / 1000) * exponent**retries + + random.uniform(0, 1) + ) + sleep_seconds = min(sleep_seconds, max_interval / 1000) + + if absolute_max_elapsed_time_ms is not None: + projected_elapsed = elapsed + int(sleep_seconds * 1000) + if projected_elapsed >= absolute_max_elapsed_time_ms: + result = _raise_or_return_after_cap( + exception, + elapsed, + retries, + " (hard cap would be exceeded during backoff)", + ) + return result + + await asyncio.sleep(sleep_seconds) + + if absolute_max_elapsed_time_ms is not None: + actual_elapsed = round(time.time() * 1000) - start + if actual_elapsed >= absolute_max_elapsed_time_ms: + result = _raise_or_return_after_cap( + exception, + actual_elapsed, + retries, + " (hard cap reached during backoff sleep)", + ) + return result + retries += 1