diff --git a/src/agentex/lib/core/tracing/span_queue.py b/src/agentex/lib/core/tracing/span_queue.py index f9105718b..a337786bf 100644 --- a/src/agentex/lib/core/tracing/span_queue.py +++ b/src/agentex/lib/core/tracing/span_queue.py @@ -20,6 +20,12 @@ _DEFAULT_MAX_SIZE = 0 # Total attempts per batch for a *transient* failure (1 == no retry). _DEFAULT_MAX_RETRIES = 1 +# Max number of batch-export HTTP requests in flight at once. The export +# backend (EGP) processes each upsert_batch in ~150ms but serves many requests +# concurrently; issuing one batch at a time caps per-pod egress at ~1/latency. +# Sending several concurrently lets a pod keep up with span production under +# load. ``1`` restores the old strictly-serial behavior. +_DEFAULT_CONCURRENCY = 8 # HTTP statuses worth retrying at the queue level. These are explicit # backpressure / transient signals; everything else (esp. 401/403/4xx auth and # validation errors) is a permanent failure that re-enqueuing cannot fix. Note @@ -76,15 +82,23 @@ class AsyncSpanQueue: """Background FIFO queue for async span processing. Span events are enqueued synchronously (non-blocking) and drained by a - background task. Items are processed in batches: all START events in a - batch are flushed concurrently, then all END events, so that per-span - start-before-end ordering is preserved while HTTP calls for independent - spans execute in parallel. - - Once the drain loop picks up the first item, it lingers up to - ``linger_ms`` waiting for more items to coalesce into the same batch. - Without the linger the drain almost always returned size-1 batches under - real agent workloads, because spans typically arrive a few ms apart. + background task. The drain coalesces ready events into batches and + *dispatches* each batch's export as its own task, so up to ``concurrency`` + batch requests can be in flight at once. This matters because each + ``upsert_batch`` HTTP call takes tens-to-hundreds of ms server-side; issuing + them one at a time caps a pod's egress at ~1/latency and lets a backlog + build under load. + + Ordering guarantee: a span's START export always completes before its END + export is issued. END batches wait on the START batches that were in flight + when they were formed; because a span's START is always enqueued before its + END, that span's START send is either still in flight (and waited on) or + already finished. Independent spans export fully concurrently. + + Once the drain loop picks up the first item, it lingers up to ``linger_ms`` + waiting for more items to coalesce into the same batch. Without the linger + the drain almost always returned size-1 batches under real agent workloads, + because spans typically arrive a few ms apart. Reliability: - ``max_size`` bounds the queue. When full, new events are dropped and @@ -101,6 +115,7 @@ def __init__( linger_ms: int | None = None, max_size: int | None = None, max_retries: int | None = None, + concurrency: int | None = None, ) -> None: resolved_max_size = ( _read_int_env("AGENTEX_SPAN_QUEUE_MAX_SIZE", _DEFAULT_MAX_SIZE) if max_size is None else max(0, max_size) @@ -115,6 +130,17 @@ def __init__( if max_retries is None else max(1, max_retries) ) + self._concurrency = ( + _read_int_env("AGENTEX_SPAN_QUEUE_CONCURRENCY", _DEFAULT_CONCURRENCY, minimum=1) + if concurrency is None + else max(1, concurrency) + ) + # Bounds concurrent export HTTP requests. + self._send_sema = asyncio.Semaphore(self._concurrency) + # Outstanding dispatched send tasks, and the subset that are START + # sends (END sends wait on these to preserve per-span ordering). + self._inflight: set[asyncio.Task[None]] = set() + self._inflight_starts: set[asyncio.Task[None]] = set() # Total spans dropped for any reason (full queue, shutdown, permanent # failure, exhausted retries). Surfaced for metrics/observability so # span loss stops being silent. @@ -169,6 +195,11 @@ def _ensure_drain_running(self) -> None: async def _drain_loop(self) -> None: while True: + # Backpressure: cap the number of in-flight send tasks so the drain + # does not run unboundedly ahead of the exporters. + while len(self._inflight) >= self._concurrency: + await asyncio.wait(set(self._inflight), return_when=asyncio.FIRST_COMPLETED) + # Block until at least one item is available. first = await self._queue.get() batch: list[_SpanQueueItem] = [first] @@ -196,22 +227,47 @@ async def _drain_loop(self) -> None: except asyncio.QueueEmpty: break - try: - # Separate START and END events. Processing all STARTs before - # ENDs ensures that on_span_start completes before on_span_end - # for any span whose both events land in the same batch. - starts = [i for i in batch if i.event_type == SpanEventType.START] - ends = [i for i in batch if i.event_type == SpanEventType.END] - - if starts: - await self._process_items(starts) - if ends: - await self._process_items(ends) - finally: - for _ in batch: - self._queue.task_done() - # Release span data for GC. - batch.clear() + # Separate START and END events and dispatch each as its own send + # task. Dispatching STARTs first (so they are registered before the + # END snapshot) guarantees an END never outruns a START of the same + # span whose events land in this batch. + starts = [i for i in batch if i.event_type == SpanEventType.START] + ends = [i for i in batch if i.event_type == SpanEventType.END] + if starts: + self._dispatch(starts, SpanEventType.START) + if ends: + # Re-check backpressure before the second dispatch so a batch + # carrying both event types can't push _inflight past the cap. + while len(self._inflight) >= self._concurrency: + await asyncio.wait(set(self._inflight), return_when=asyncio.FIRST_COMPLETED) + self._dispatch(ends, SpanEventType.END) + + def _dispatch(self, items: list[_SpanQueueItem], event_type: SpanEventType) -> None: + """Spawn a background task to export ``items``. + + END sends snapshot the currently in-flight START tasks and wait for them + before issuing, preserving the per-span START-before-END invariant. + """ + barrier = tuple(self._inflight_starts) if event_type == SpanEventType.END else () + task = asyncio.create_task(self._run_send(items, barrier)) + self._inflight.add(task) + task.add_done_callback(self._inflight.discard) + if event_type == SpanEventType.START: + self._inflight_starts.add(task) + task.add_done_callback(self._inflight_starts.discard) + + async def _run_send(self, items: list[_SpanQueueItem], barrier: tuple[asyncio.Task[None], ...]) -> None: + try: + if barrier: + # Wait for the START sends this END batch depends on. Their + # exceptions are irrelevant here — we only need them finished. + await asyncio.gather(*barrier, return_exceptions=True) + await self._process_items(items) + finally: + # Mark every item done so shutdown's queue.join() can complete only + # once all sends (and their retries) have finished. + for _ in items: + self._queue.task_done() async def _process_items(self, items: list[_SpanQueueItem]) -> None: """Dispatch a batch of same-event-type items to each processor in one call. @@ -243,10 +299,12 @@ async def _handle( ) -> None: spans = [item.span for item in items] try: - if event_type == SpanEventType.START: - await p.on_spans_start(spans) - else: - await p.on_spans_end(spans) + # Hold a concurrency slot only for the duration of the HTTP call. + async with self._send_sema: + if event_type == SpanEventType.START: + await p.on_spans_start(spans) + else: + await p.on_spans_end(spans) except Exception as exc: self._handle_failure(p, items, event_type, exc) @@ -288,7 +346,14 @@ def _handle_failure( def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None: """Put a single failed item back on the queue, scoped to the processor - that failed, with an incremented attempt count.""" + that failed, with an incremented attempt count. + + NOTE: a re-enqueued START goes to the *back* of the queue. If an END + for the same span is dispatched concurrently before this START is picked + up again, the END's barrier snapshot won't contain it, breaking the + START-before-END guarantee for that span. This is benign at the default + ``max_retries=1`` (retries disabled) but must be addressed before + enabling retries by default.""" try: self._queue.put_nowait( _SpanQueueItem( @@ -307,14 +372,21 @@ def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None: async def shutdown(self, timeout: float = 30.0) -> None: self._stopping = True - if self._queue.empty() and (self._drain_task is None or self._drain_task.done()): + drain_idle = self._drain_task is None or self._drain_task.done() + if self._queue.empty() and drain_idle and not self._inflight: return + + timed_out = False try: + # join() returns once every enqueued (and re-enqueued) item has been + # marked done by its send task. await asyncio.wait_for(self._queue.join(), timeout=timeout) except asyncio.TimeoutError: + timed_out = True logger.warning( "Span queue shutdown timed out after %.1fs with %d items remaining", timeout, self._queue.qsize() ) + if self._drain_task is not None and not self._drain_task.done(): self._drain_task.cancel() try: @@ -322,6 +394,15 @@ async def shutdown(self, timeout: float = 30.0) -> None: except asyncio.CancelledError: pass + # Clean up any in-flight send tasks. On a clean shutdown these are + # already finishing; on timeout, cancel the stragglers so we don't hang. + inflight = list(self._inflight) + if inflight: + if timed_out: + for task in inflight: + task.cancel() + await asyncio.gather(*inflight, return_exceptions=True) + _default_span_queue: AsyncSpanQueue | None = None diff --git a/tests/lib/core/tracing/test_span_queue.py b/tests/lib/core/tracing/test_span_queue.py index 2e68cf88d..eee513ac9 100644 --- a/tests/lib/core/tracing/test_span_queue.py +++ b/tests/lib/core/tracing/test_span_queue.py @@ -425,9 +425,10 @@ async def block_first(spans: list[Span]) -> None: proc.on_spans_start = AsyncMock(side_effect=block_first) proc.on_spans_end = AsyncMock() - # max_size=1, no linger: the drain pulls item-0 and blocks; item-1 fills - # the queue; items 2 and 3 are dropped. - queue = AsyncSpanQueue(max_size=1, linger_ms=0) + # max_size=1, no linger, concurrency=1: the drain dispatches item-0 and + # then blocks at the in-flight cap; item-1 fills the queue; items 2 and 3 + # are dropped. + queue = AsyncSpanQueue(max_size=1, linger_ms=0, concurrency=1) queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) await asyncio.sleep(0.02) # let the drain pick up s0 and block @@ -536,6 +537,118 @@ async def always_503(spans: list[Span]) -> None: assert queue.dropped_spans == 1 +class TestAsyncSpanQueueConcurrency: + """Span export should issue multiple batch requests concurrently (bounded), + so per-pod egress isn't capped at one in-flight request — while still + guaranteeing a span's START send completes before its END send. + """ + + async def test_batches_dispatched_concurrently_up_to_bound(self): + current = 0 + max_seen = 0 + lock = asyncio.Lock() + + async def slow_start(spans: list[Span]) -> None: + nonlocal current, max_seen + async with lock: + current += 1 + max_seen = max(max_seen, current) + await asyncio.sleep(0.05) + async with lock: + current -= 1 + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=slow_start) + proc.on_spans_end = AsyncMock() + + # batch_size=1 → each span is its own batch/send; concurrency=4 caps + # simultaneous in-flight sends. + queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=4) + for i in range(8): + queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc]) + + await queue.shutdown() + + assert proc.on_spans_start.call_count == 8 + assert 2 <= max_seen <= 4, f"expected bounded concurrency (2..4), saw {max_seen}" + + async def test_concurrency_one_serializes(self): + current = 0 + max_seen = 0 + lock = asyncio.Lock() + + async def slow_start(spans: list[Span]) -> None: + nonlocal current, max_seen + async with lock: + current += 1 + max_seen = max(max_seen, current) + await asyncio.sleep(0.03) + async with lock: + current -= 1 + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=slow_start) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=1) + for i in range(4): + queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc]) + + await queue.shutdown() + + assert max_seen == 1, f"concurrency=1 must serialize sends, saw {max_seen}" + + async def test_concurrent_is_faster_than_serial(self): + async def slow_start(spans: list[Span]) -> None: + await asyncio.sleep(0.05) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=slow_start) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=8) + for i in range(8): + queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc]) + + start = time.monotonic() + await queue.shutdown() + elapsed = time.monotonic() - start + + serial = 8 * 0.05 + assert elapsed < serial * 0.5, f"concurrent drain took {elapsed:.3f}s; serial would be {serial:.3f}s" + + async def test_end_waits_for_start_of_same_span(self): + """The per-span ordering invariant: a span's END upsert must not be sent + until its START upsert has completed, even with concurrency enabled.""" + log: list[tuple[str, str]] = [] + + async def on_start(spans: list[Span]) -> None: + log.append(("start_enter", spans[0].id)) + await asyncio.sleep(0.05) + log.append(("start_exit", spans[0].id)) + + async def on_end(spans: list[Span]) -> None: + log.append(("end_enter", spans[0].id)) + await asyncio.sleep(0.01) + log.append(("end_exit", spans[0].id)) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=on_start) + proc.on_spans_end = AsyncMock(side_effect=on_end) + + queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=4) + queue.enqueue(SpanEventType.START, _make_span("A"), [proc]) + await asyncio.sleep(0.01) # let the START send begin (and block on sleep) + queue.enqueue(SpanEventType.END, _make_span("A"), [proc]) + + await queue.shutdown() + + # END must not enter until START has exited for the same span. + start_exit = log.index(("start_exit", "A")) + end_enter = log.index(("end_enter", "A")) + assert start_exit < end_enter, f"END began before START completed: {log}" + + class TestAsyncSpanQueueIntegration: async def test_integration_with_async_trace(self): call_log: list[tuple[str, str]] = []