diff --git a/CHANGELOG.md b/CHANGELOG.md index 43f4be7..6fbec14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,30 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project uses **CalVer `YY.M.PP`** (PEP 440 may normalise patch numbers for the Python wheel — e.g. `26.06.00` → `26.6.0`). +## [26.6.14] - 2026-06-16 + +### Changed + +- **The bbox-refine post-processing leg now refines a multi-document + extraction's documents concurrently instead of strictly one at a time.** + `BboxRefineWorker._refine_extraction_result` fanned out one `await` per + document in a sequential loop, so a cross-document extraction (e.g. a cap + table merged across several deeds) paid the *sum* of every document's + OCR + per-page-matcher latency — a 5-document, ~290-page job ran ~700s and + blew the 600s `bbox_refine_timeout_s`, timing out on all 3 attempts and + re-doing every document from scratch each retry. Documents are independent + (each mutates its own `field_groups`) and the refiner already pushes its + CPU-bound word collection to a thread, so the leg now runs them through a + bounded `asyncio.gather`: wall-clock drops from the sum to ~the slowest + document. A new `FLYDOCS_BBOX_REFINE_DOC_CONCURRENCY` (default `4`) caps the + fan-out (OCR is CPU-bound; each doc multiplies in-flight LLM calls) — set it + per deployment to roughly the pod's CPU limit + 1. The + `gather` is the barrier that keeps "the last document finished" well-defined: + the completion transition + post-processing webhook still fire exactly once, + after every document has settled. Per-job retry / permanent-failure + semantics are unchanged — one document's failure still fails the leg, with + no sibling task left running. + ## [26.6.13] - 2026-06-16 ### Changed diff --git a/env_template b/env_template index 0811544..5830ba6 100644 --- a/env_template +++ b/env_template @@ -153,6 +153,14 @@ FLYDOCS_BBOX_REFINE_TESSERACT_LANG=spa+eng # * ``llm`` -- one LLM call per page over every field. # * ``fuzzy`` -- rapidfuzz only, no LLM fallback. FLYDOCS_BBOX_REFINE_MATCHER=hybrid +# Max documents of one extraction refined concurrently. The refine leg fans +# out a task per document (OCR + per-page matcher), so a multi-document +# extraction no longer runs strictly one doc at a time -- wall-clock drops +# from the sum of per-doc latencies to ~the slowest one. Bounded because OCR +# is CPU-bound and each doc multiplies in-flight LLM calls. +# Set this to roughly the pod's CPU limit + 1 (the +1 keeps the cores busy +# across the LLM-matcher I/O waits); lower it under provider rate limits. +FLYDOCS_BBOX_REFINE_DOC_CONCURRENCY=4 # ---------------------------------------------------------------------------- # Security (optional -- only used if you front the service with an auth gate) diff --git a/pyproject.toml b/pyproject.toml index 78fcd10..017116d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "flydocs" # CalVer YY.MM.PP -- bumped per release. Note that PEP 440 normalises # ``26.05.01`` -> ``26.5.1`` in the built wheel filename. -version = "26.6.13" +version = "26.6.14" description = "Pure-multimodal Intelligent Document Processing service: structured fields + bounding boxes, validation, authenticity checks, LLM judge, and a business-rule engine. Sync + queue-backed async APIs over fireflyframework-pyfly and -agentic. Part of Firefly OperationOS, platform-agnostic by design." readme = "README.md" requires-python = ">=3.13" diff --git a/src/flydocs/__init__.py b/src/flydocs/__init__.py index e02def8..e82b5a2 100644 --- a/src/flydocs/__init__.py +++ b/src/flydocs/__init__.py @@ -24,4 +24,4 @@ `PromptRegistry`). """ -__version__ = "26.6.13" +__version__ = "26.6.14" diff --git a/src/flydocs/config.py b/src/flydocs/config.py index b2ef094..9af1dee 100644 --- a/src/flydocs/config.py +++ b/src/flydocs/config.py @@ -78,6 +78,14 @@ class IDPSettings(BaseSettings): # default ceiling is generous. bbox_refine_max_attempts: int = 3 bbox_refine_timeout_s: int = 600 + # Max documents of a single extraction refined concurrently. The refine + # leg fans out one task per document (OCR + per-page matcher), so a + # multi-document extraction no longer runs strictly one doc at a time. + # Bounded because OCR is CPU-bound and each doc multiplies in-flight LLM + # calls. Set this per deployment to roughly the pod's CPU limit + 1 (the + # +1 keeps the cores busy across the LLM-matcher I/O waits); lower it under + # provider rate limits. + bbox_refine_doc_concurrency: int = 4 # -- Extraction ----------------------------------------------------- model: str = "anthropic:claude-sonnet-4-6" diff --git a/src/flydocs/core/services/workers/bbox_refine_worker.py b/src/flydocs/core/services/workers/bbox_refine_worker.py index f36002c..cab1424 100644 --- a/src/flydocs/core/services/workers/bbox_refine_worker.py +++ b/src/flydocs/core/services/workers/bbox_refine_worker.py @@ -314,19 +314,44 @@ async def _refine_extraction_result(self, row: Any) -> ExtractionResult: language_hint = (row.options_json or {}).get("language_hint") - for document in result.documents: + # Refine the documents concurrently rather than strictly one-by-one: + # each document is independent (its own field_groups, mutated in + # place), and the refiner pushes its CPU-bound word collection to a + # thread, so overlapping docs cut the wall-clock from the sum of + # per-doc latencies to ~the slowest one. A semaphore bounds the fan-out + # (OCR is CPU-bound; each doc multiplies in-flight LLM calls). + # ``gather`` is the barrier that makes "the last document finished" + # well-defined: it returns only once every task has settled, so the + # caller fires the completion + webhook exactly once, afterwards. + semaphore = asyncio.Semaphore(max(1, self._settings.bbox_refine_doc_concurrency)) + + async def _refine_one(document: Any) -> None: if not document.field_groups: - continue + return mapped = by_filename.get(document.source_file or "") if mapped is None: - continue - await self._refiner.refine( - document_bytes=mapped.bytes, - media_type=mapped.media_type, - page_count=mapped.page_count, - groups=document.field_groups, - language_hint=language_hint, - ) + return + async with semaphore: + await self._refiner.refine( + document_bytes=mapped.bytes, + media_type=mapped.media_type, + page_count=mapped.page_count, + groups=document.field_groups, + language_hint=language_hint, + ) + + # ``return_exceptions=True`` so one document's failure never strands + # its siblings mid-flight; we await them all (the barrier) and then + # re-raise the first error, preserving the existing per-job + # retry/permanent-failure semantics (and the original exception type + # that ``_is_permanent`` inspects). + outcomes = await asyncio.gather( + *(_refine_one(document) for document in result.documents), + return_exceptions=True, + ) + for outcome in outcomes: + if isinstance(outcome, BaseException): + raise outcome return result def _backoff_delay(self, attempts: int) -> float: diff --git a/tests/unit/test_bbox_refine_worker.py b/tests/unit/test_bbox_refine_worker.py index 59a8fc6..a302bb3 100644 --- a/tests/unit/test_bbox_refine_worker.py +++ b/tests/unit/test_bbox_refine_worker.py @@ -16,6 +16,7 @@ from __future__ import annotations +import asyncio import base64 import io from dataclasses import dataclass, field @@ -283,3 +284,153 @@ async def test_permanent_error_marks_failed_no_republish() -> None: names = [name for name, _ in repo.calls] assert "fail_bbox_refinement" in names assert publisher.published == [] # never republish on permanent + + +# ----------------------------------------------- parallel per-document refine + + +class _RecordingRefiner: + """Refiner double that records how many ``refine`` calls overlap. + + Each call increments a live counter, yields the event loop while + "working", then decrements -- so ``max_active`` captures the peak + concurrency the worker actually drove across the documents. + """ + + def __init__(self) -> None: + self.active = 0 + self.max_active = 0 + self.calls = 0 + self.completed = 0 + + async def refine(self, *, groups: Any, **_: Any) -> None: + self.calls += 1 + self.active += 1 + self.max_active = max(self.max_active, self.active) + try: + await asyncio.sleep(0.05) + finally: + self.active -= 1 + self.completed += 1 + + +def _result_with_n_docs(n: int) -> ExtractionResult: + docs = [] + for i in range(n): + field_ = ExtractedField( + name="customer_name", + value="Acme Corporation", + pages=[1], + bbox=BoundingBox(xmin=0.05, ymin=0.05, xmax=0.95, ymax=0.95), + ) + group = ExtractedFieldGroup(name="customer", fields=[field_]) + docs.append(Document(type="invoice", pages=[1], field_groups=[group], source_file=f"doc{i}.pdf")) + return ExtractionResult( + id="ext_RESULT0000000000000000000000", + files=[], + documents=docs, + pipeline=PipelineMeta(model="anthropic:claude-sonnet-4-6", latency_ms=1000), + ) + + +def _ext_with_n_docs(n: int) -> _StubExtraction: + pdf = _real_pdf() + files = [ + { + "filename": f"doc{i}.pdf", + "content_base64": base64.b64encode(pdf).decode(), + "content_type": "application/pdf", + } + for i in range(n) + ] + return _StubExtraction( + schema_json={"files": files}, + result_json=_result_with_n_docs(n).model_dump(mode="json", by_alias=True), + ) + + +def _worker_with(repo: _StubRepo, refiner: Any, settings: IDPSettings) -> BboxRefineWorker: + return BboxRefineWorker( + repository=repo, # type: ignore[arg-type] + event_publisher=_StubPublisher(), # type: ignore[arg-type] + webhook=_StubWebhook(), # type: ignore[arg-type] + normalizer=_make_normalizer(), + refiner=refiner, + settings=settings, + ) + + +@pytest.mark.asyncio +async def test_documents_are_refined_concurrently() -> None: + """Several documents in one extraction refine in parallel, not one-by-one.""" + refiner = _RecordingRefiner() + repo = _StubRepo(_ext_with_n_docs(3)) + worker = _worker_with(repo, refiner, IDPSettings()) + + await worker._process(repo.ext.id) + + assert refiner.calls == 3 + assert refiner.max_active > 1 # sequential code peaks at 1 + + +@pytest.mark.asyncio +async def test_document_concurrency_is_capped_by_setting() -> None: + """Concurrency never exceeds ``bbox_refine_doc_concurrency`` but does reach it.""" + refiner = _RecordingRefiner() + repo = _StubRepo(_ext_with_n_docs(4)) + worker = _worker_with(repo, refiner, IDPSettings(bbox_refine_doc_concurrency=2)) + + await worker._process(repo.ext.id) + + assert refiner.calls == 4 + assert refiner.max_active == 2 # capped at the limit, and reaches it + + +@pytest.mark.asyncio +async def test_completion_waits_for_every_document_before_callback() -> None: + """The barrier guarantee: the bbox leg is only completed once EVERY + document has finished refining -- so the post-processing callback fires + exactly once, after the last document.""" + refiner = _RecordingRefiner() + repo = _StubRepo(_ext_with_n_docs(5)) + worker = _worker_with(repo, refiner, IDPSettings()) + + await worker._process(repo.ext.id) + + # All five refined, and completion happened (once) only after all of them. + assert refiner.completed == 5 + complete_calls = [name for name, _ in repo.calls if name == "complete_bbox_refinement"] + assert complete_calls == ["complete_bbox_refinement"] + assert repo.ext.post_processing_bbox_status == "succeeded" + + +class _FlakyRefiner(_RecordingRefiner): + """Refines normally but raises on the document whose group is poisoned.""" + + async def refine(self, *, groups: Any, **kw: Any) -> None: + await super().refine(groups=groups, **kw) + if groups and groups[0].name == "boom": + raise RuntimeError("refiner exploded") + + +@pytest.mark.asyncio +async def test_one_document_failure_fails_the_job_without_stranding_siblings() -> None: + """A refine error still fails the leg (so it retries / goes permanent), + and every sibling document settles first -- the barrier leaves no task + running after we leave.""" + refiner = _FlakyRefiner() + repo = _StubRepo(_ext_with_n_docs(3)) + result = ExtractionResult.model_validate(repo.ext.result_json) + result.documents[1].field_groups[0].name = "boom" # poison the middle doc + repo.ext.result_json = result.model_dump(mode="json", by_alias=True) + worker = _worker_with(repo, refiner, IDPSettings()) + + await worker._process(repo.ext.id) + + # All three were awaited (no orphaned in-flight task), but the leg failed. + assert refiner.completed == 3 + assert refiner.active == 0 + names = [name for name, _ in repo.calls] + assert "complete_bbox_refinement" not in names + # First attempt of 3 -> retryable RuntimeError -> republished, not failed. + assert "fail_bbox_refinement" not in names diff --git a/uv.lock b/uv.lock index 2b04a17..7e822a1 100644 --- a/uv.lock +++ b/uv.lock @@ -1483,7 +1483,7 @@ security = [ [[package]] name = "flydocs" -version = "26.6.13" +version = "26.6.14" source = { editable = "." } dependencies = [ { name = "aiosqlite" },