Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,30 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project uses **CalVer `YY.M.PP`** (PEP 440 may normalise patch numbers
for the Python wheel — e.g. `26.06.00` → `26.6.0`).

## [26.6.14] - 2026-06-16

### Changed

- **The bbox-refine post-processing leg now refines a multi-document
extraction's documents concurrently instead of strictly one at a time.**
`BboxRefineWorker._refine_extraction_result` fanned out one `await` per
document in a sequential loop, so a cross-document extraction (e.g. a cap
table merged across several deeds) paid the *sum* of every document's
OCR + per-page-matcher latency — a 5-document, ~290-page job ran ~700s and
blew the 600s `bbox_refine_timeout_s`, timing out on all 3 attempts and
re-doing every document from scratch each retry. Documents are independent
(each mutates its own `field_groups`) and the refiner already pushes its
CPU-bound word collection to a thread, so the leg now runs them through a
bounded `asyncio.gather`: wall-clock drops from the sum to ~the slowest
document. A new `FLYDOCS_BBOX_REFINE_DOC_CONCURRENCY` (default `4`) caps the
fan-out (OCR is CPU-bound; each doc multiplies in-flight LLM calls) — set it
per deployment to roughly the pod's CPU limit + 1. The
`gather` is the barrier that keeps "the last document finished" well-defined:
the completion transition + post-processing webhook still fire exactly once,
after every document has settled. Per-job retry / permanent-failure
semantics are unchanged — one document's failure still fails the leg, with
no sibling task left running.

## [26.6.13] - 2026-06-16

### Changed
Expand Down
8 changes: 8 additions & 0 deletions env_template
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ FLYDOCS_BBOX_REFINE_TESSERACT_LANG=spa+eng
# * ``llm`` -- one LLM call per page over every field.
# * ``fuzzy`` -- rapidfuzz only, no LLM fallback.
FLYDOCS_BBOX_REFINE_MATCHER=hybrid
# Max documents of one extraction refined concurrently. The refine leg fans
# out a task per document (OCR + per-page matcher), so a multi-document
# extraction no longer runs strictly one doc at a time -- wall-clock drops
# from the sum of per-doc latencies to ~the slowest one. Bounded because OCR
# is CPU-bound and each doc multiplies in-flight LLM calls.
# Set this to roughly the pod's CPU limit + 1 (the +1 keeps the cores busy
# across the LLM-matcher I/O waits); lower it under provider rate limits.
FLYDOCS_BBOX_REFINE_DOC_CONCURRENCY=4

# ----------------------------------------------------------------------------
# Security (optional -- only used if you front the service with an auth gate)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "flydocs"
# CalVer YY.MM.PP -- bumped per release. Note that PEP 440 normalises
# ``26.05.01`` -> ``26.5.1`` in the built wheel filename.
version = "26.6.13"
version = "26.6.14"
description = "Pure-multimodal Intelligent Document Processing service: structured fields + bounding boxes, validation, authenticity checks, LLM judge, and a business-rule engine. Sync + queue-backed async APIs over fireflyframework-pyfly and -agentic. Part of Firefly OperationOS, platform-agnostic by design."
readme = "README.md"
requires-python = ">=3.13"
Expand Down
2 changes: 1 addition & 1 deletion src/flydocs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
`PromptRegistry`).
"""

__version__ = "26.6.13"
__version__ = "26.6.14"
8 changes: 8 additions & 0 deletions src/flydocs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ class IDPSettings(BaseSettings):
# default ceiling is generous.
bbox_refine_max_attempts: int = 3
bbox_refine_timeout_s: int = 600
# Max documents of a single extraction refined concurrently. The refine
# leg fans out one task per document (OCR + per-page matcher), so a
# multi-document extraction no longer runs strictly one doc at a time.
# Bounded because OCR is CPU-bound and each doc multiplies in-flight LLM
# calls. Set this per deployment to roughly the pod's CPU limit + 1 (the
# +1 keeps the cores busy across the LLM-matcher I/O waits); lower it under
# provider rate limits.
bbox_refine_doc_concurrency: int = 4

# -- Extraction -----------------------------------------------------
model: str = "anthropic:claude-sonnet-4-6"
Expand Down
45 changes: 35 additions & 10 deletions src/flydocs/core/services/workers/bbox_refine_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,19 +314,44 @@ async def _refine_extraction_result(self, row: Any) -> ExtractionResult:

language_hint = (row.options_json or {}).get("language_hint")

for document in result.documents:
# Refine the documents concurrently rather than strictly one-by-one:
# each document is independent (its own field_groups, mutated in
# place), and the refiner pushes its CPU-bound word collection to a
# thread, so overlapping docs cut the wall-clock from the sum of
# per-doc latencies to ~the slowest one. A semaphore bounds the fan-out
# (OCR is CPU-bound; each doc multiplies in-flight LLM calls).
# ``gather`` is the barrier that makes "the last document finished"
# well-defined: it returns only once every task has settled, so the
# caller fires the completion + webhook exactly once, afterwards.
semaphore = asyncio.Semaphore(max(1, self._settings.bbox_refine_doc_concurrency))

async def _refine_one(document: Any) -> None:
if not document.field_groups:
continue
return
mapped = by_filename.get(document.source_file or "")
if mapped is None:
continue
await self._refiner.refine(
document_bytes=mapped.bytes,
media_type=mapped.media_type,
page_count=mapped.page_count,
groups=document.field_groups,
language_hint=language_hint,
)
return
async with semaphore:
await self._refiner.refine(
document_bytes=mapped.bytes,
media_type=mapped.media_type,
page_count=mapped.page_count,
groups=document.field_groups,
language_hint=language_hint,
)

# ``return_exceptions=True`` so one document's failure never strands
# its siblings mid-flight; we await them all (the barrier) and then
# re-raise the first error, preserving the existing per-job
# retry/permanent-failure semantics (and the original exception type
# that ``_is_permanent`` inspects).
outcomes = await asyncio.gather(
*(_refine_one(document) for document in result.documents),
return_exceptions=True,
)
for outcome in outcomes:
if isinstance(outcome, BaseException):
raise outcome
return result

def _backoff_delay(self, attempts: int) -> float:
Expand Down
151 changes: 151 additions & 0 deletions tests/unit/test_bbox_refine_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

import asyncio
import base64
import io
from dataclasses import dataclass, field
Expand Down Expand Up @@ -283,3 +284,153 @@ async def test_permanent_error_marks_failed_no_republish() -> None:
names = [name for name, _ in repo.calls]
assert "fail_bbox_refinement" in names
assert publisher.published == [] # never republish on permanent


# ----------------------------------------------- parallel per-document refine


class _RecordingRefiner:
"""Refiner double that records how many ``refine`` calls overlap.

Each call increments a live counter, yields the event loop while
"working", then decrements -- so ``max_active`` captures the peak
concurrency the worker actually drove across the documents.
"""

def __init__(self) -> None:
self.active = 0
self.max_active = 0
self.calls = 0
self.completed = 0

async def refine(self, *, groups: Any, **_: Any) -> None:
self.calls += 1
self.active += 1
self.max_active = max(self.max_active, self.active)
try:
await asyncio.sleep(0.05)
finally:
self.active -= 1
self.completed += 1


def _result_with_n_docs(n: int) -> ExtractionResult:
docs = []
for i in range(n):
field_ = ExtractedField(
name="customer_name",
value="Acme Corporation",
pages=[1],
bbox=BoundingBox(xmin=0.05, ymin=0.05, xmax=0.95, ymax=0.95),
)
group = ExtractedFieldGroup(name="customer", fields=[field_])
docs.append(Document(type="invoice", pages=[1], field_groups=[group], source_file=f"doc{i}.pdf"))
return ExtractionResult(
id="ext_RESULT0000000000000000000000",
files=[],
documents=docs,
pipeline=PipelineMeta(model="anthropic:claude-sonnet-4-6", latency_ms=1000),
)


def _ext_with_n_docs(n: int) -> _StubExtraction:
pdf = _real_pdf()
files = [
{
"filename": f"doc{i}.pdf",
"content_base64": base64.b64encode(pdf).decode(),
"content_type": "application/pdf",
}
for i in range(n)
]
return _StubExtraction(
schema_json={"files": files},
result_json=_result_with_n_docs(n).model_dump(mode="json", by_alias=True),
)


def _worker_with(repo: _StubRepo, refiner: Any, settings: IDPSettings) -> BboxRefineWorker:
return BboxRefineWorker(
repository=repo, # type: ignore[arg-type]
event_publisher=_StubPublisher(), # type: ignore[arg-type]
webhook=_StubWebhook(), # type: ignore[arg-type]
normalizer=_make_normalizer(),
refiner=refiner,
settings=settings,
)


@pytest.mark.asyncio
async def test_documents_are_refined_concurrently() -> None:
"""Several documents in one extraction refine in parallel, not one-by-one."""
refiner = _RecordingRefiner()
repo = _StubRepo(_ext_with_n_docs(3))
worker = _worker_with(repo, refiner, IDPSettings())

await worker._process(repo.ext.id)

assert refiner.calls == 3
assert refiner.max_active > 1 # sequential code peaks at 1


@pytest.mark.asyncio
async def test_document_concurrency_is_capped_by_setting() -> None:
"""Concurrency never exceeds ``bbox_refine_doc_concurrency`` but does reach it."""
refiner = _RecordingRefiner()
repo = _StubRepo(_ext_with_n_docs(4))
worker = _worker_with(repo, refiner, IDPSettings(bbox_refine_doc_concurrency=2))

await worker._process(repo.ext.id)

assert refiner.calls == 4
assert refiner.max_active == 2 # capped at the limit, and reaches it


@pytest.mark.asyncio
async def test_completion_waits_for_every_document_before_callback() -> None:
"""The barrier guarantee: the bbox leg is only completed once EVERY
document has finished refining -- so the post-processing callback fires
exactly once, after the last document."""
refiner = _RecordingRefiner()
repo = _StubRepo(_ext_with_n_docs(5))
worker = _worker_with(repo, refiner, IDPSettings())

await worker._process(repo.ext.id)

# All five refined, and completion happened (once) only after all of them.
assert refiner.completed == 5
complete_calls = [name for name, _ in repo.calls if name == "complete_bbox_refinement"]
assert complete_calls == ["complete_bbox_refinement"]
assert repo.ext.post_processing_bbox_status == "succeeded"


class _FlakyRefiner(_RecordingRefiner):
"""Refines normally but raises on the document whose group is poisoned."""

async def refine(self, *, groups: Any, **kw: Any) -> None:
await super().refine(groups=groups, **kw)
if groups and groups[0].name == "boom":
raise RuntimeError("refiner exploded")


@pytest.mark.asyncio
async def test_one_document_failure_fails_the_job_without_stranding_siblings() -> None:
"""A refine error still fails the leg (so it retries / goes permanent),
and every sibling document settles first -- the barrier leaves no task
running after we leave."""
refiner = _FlakyRefiner()
repo = _StubRepo(_ext_with_n_docs(3))
result = ExtractionResult.model_validate(repo.ext.result_json)
result.documents[1].field_groups[0].name = "boom" # poison the middle doc
repo.ext.result_json = result.model_dump(mode="json", by_alias=True)
worker = _worker_with(repo, refiner, IDPSettings())

await worker._process(repo.ext.id)

# All three were awaited (no orphaned in-flight task), but the leg failed.
assert refiner.completed == 3
assert refiner.active == 0
names = [name for name, _ in repo.calls]
assert "complete_bbox_refinement" not in names
# First attempt of 3 -> retryable RuntimeError -> republished, not failed.
assert "fail_bbox_refinement" not in names
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading