Skip to content

perf(tracing): bounded-concurrency span export#374

Open
smoreinis wants to merge 3 commits into
nextfrom
stas/tracing-concurrent-egress
Open

perf(tracing): bounded-concurrency span export#374
smoreinis wants to merge 3 commits into
nextfrom
stas/tracing-concurrent-egress

Conversation

@smoreinis
Copy link
Copy Markdown
Contributor

@smoreinis smoreinis commented May 29, 2026

Summary

Makes SGP span export concurrent (bounded), removing the per-pod egress ceiling that caused trace backlogs under load. Follow-up to #362 (span queue linger + per-loop keepalive), which fixed ingest latency but left egress serial.

Findings — why egress couldn't keep up

A load test showed span export collapsing under 2× traffic: the agent's PUT rate dropped while the EGP backend sat idle, and the backlog only drained after the run ended. We chased this with the agent logs + Datadog metrics/APM:

  • Not data loss. All spans arrive eventually (aside from a small % lost to backend 401s, which are a separate EGP auth issue, not this code). The DB got every row once the pods drained the backlog post-run.

  • Not CPU. Mock-agent CPU sat at ~8–11% of limit during both the 1× and 2× runs, with zero CFS throttling. The event loop was not saturated.

  • Not TLS / the HTTP client. Export goes over plain http:// to an in-cluster service (egp-api-backend.egp.svc.cluster.local) — sub-ms connection setup, no handshake. (The keepalive pool from perf(tracing): span queue linger + per-loop httpx keepalive #362 was correct but unused.)

  • It's serial issuance vs. per-request latency. APM shows each PUT /public/v5/spans/batch takes ~150 ms p50 / ~330 ms p95 (server-side batch processing — matches the ~0.5 s batch p99 from the run), in both runs. The drain loop awaits each upsert_batch to completion before sending the next, so one request is in flight per pod at a time:

    per-pod egress ≈ 1 / (2 × 150 ms) × 2 PUTs ≈ ~6–7 PUT/s per pod, independent of offered load.

    EGP absorbs ~169 PUT/s in aggregate because it serves requests concurrently; the agent never issued them concurrently, so it couldn't use that capacity. At 2× the per-pod ceiling is exceeded → backlog grows → drains once production stops. This is a Little's Law cap: throughput = concurrency ÷ latency, with concurrency pinned at 1.

Change

The drain loop now dispatches each batch's export as its own task, bounded by AGENTEX_SPAN_QUEUE_CONCURRENCY (default 8), so multiple upsert_batch requests run at once. With ~150 ms latency this lifts the per-pod ceiling from ~6–7 PUT/s to ~50–70 PUT/s, reusing the keepalive pool #362 added. Setting concurrency=1 restores the old strictly-serial behavior.

Ordering is preserved

to_request_params() always sends output (null on a START), so a START landing after its END could null out the written output. The change keeps the per-span START-before-END invariant: END send-tasks snapshot the in-flight START tasks and await them before issuing. Because a span's START is always enqueued — and therefore dispatched (FIFO) — before its END, that span's START send is either still in flight (and waited on) or already complete. Independent spans export fully concurrently.

shutdown() still drains via queue.join() (task_done moved into the send-task) and cancels stragglers on timeout.

Test plan

  • rye run pytest tests/lib/core/tracing/ — 52 passed, 2 skipped
  • rye run ruff check / rye run pyright / ruff format --check — clean
  • New tests cover: bounded concurrency (overlap capped at the limit), concurrency=1 serializes, concurrent drain is >2× faster than serial, and the per-span END-waits-for-START ordering invariant under concurrency.
  • Re-run the 2× load test and confirm egress keeps up with production (no post-run backlog).

Config

env default meaning
AGENTEX_SPAN_QUEUE_CONCURRENCY 8 max in-flight export requests per pod (1 = serial)

Known limitation

When queue-level retry is enabled (AGENTEX_SPAN_QUEUE_MAX_RETRIES > 1, default 1 = off), a re-enqueued START goes to the back of the queue and may not be in a concurrently-dispatched END's barrier snapshot — a narrow window where a retried START could reorder against its END. Default config is unaffected. Worth tightening if we turn retries on by default.

Greptile Summary

This PR refactors the span queue drain loop to dispatch each batch's export as an independent asyncio task (bounded by AGENTEX_SPAN_QUEUE_CONCURRENCY, default 8), replacing the previous serial one-at-a-time await. The per-span START-before-END ordering invariant is preserved via a barrier: each END send-task snapshots the currently in-flight START tasks and awaits them before issuing, while a shared semaphore (_send_sema) caps actual concurrent HTTP calls.

  • Concurrency control: _inflight tracks dispatched tasks for backpressure; _send_sema is the hard cap on simultaneous HTTP requests. The drain loop re-checks backpressure before dispatching ENDs, keeping _inflight within the configured bound even when a batch carries both event types.
  • Shutdown improvements: The early-exit guard now also checks _inflight for non-empty; on clean shutdown tasks drain via queue.join(); on timeout, stragglers are cancelled before asyncio.gathering.
  • Known limitation documented in code: A re-enqueued START (when max_retries > 1) goes to the back of the queue and may miss an END's barrier snapshot — flagged in a NOTE in _reenqueue; benign at the default max_retries=1.

Confidence Score: 5/5

Safe to merge — the concurrency model, ordering invariant, backpressure gate, and shutdown cleanup are all correctly implemented and covered by new tests.

The drain loop correctly bounds dispatched tasks to _concurrency (the re-check before dispatching ENDs prevents overshoot even when a batch carries both event types), the barrier mechanism faithfully preserves START-before-END ordering, _send_sema is the hard cap on concurrent HTTP calls, and shutdown() drains via queue.join() with correct straggler cancellation on timeout. The known retry-ordering hole is documented in code and is inert at the default max_retries=1.

No files require special attention.

Important Files Changed

Filename Overview
src/agentex/lib/core/tracing/span_queue.py Core change: drain loop now dispatches send tasks concurrently via asyncio tasks + semaphore. Ordering invariant, backpressure gate, and shutdown cleanup are all correctly implemented.
tests/lib/core/tracing/test_span_queue.py Adds four new tests covering bounded concurrency, serialization at concurrency=1, speed improvement over serial, and the END-waits-for-START ordering invariant. Existing drop-observability test updated to pass concurrency=1 to preserve its serial assumption.

Sequence Diagram

sequenceDiagram
    participant DL as _drain_loop
    participant D as _dispatch()
    participant T1 as START task (T1)
    participant T2 as END task (T2)
    participant S as _send_sema
    participant EGP as EGP backend

    DL->>DL: "backpressure gate (len(_inflight) < concurrency)"
    DL->>DL: dequeue + linger → batch [START-A, END-A]
    DL->>D: _dispatch(starts, START)
    D->>T1: asyncio.create_task(_run_send)
    D->>D: _inflight.add(T1), _inflight_starts.add(T1)
    DL->>DL: re-check backpressure before ENDs
    DL->>D: _dispatch(ends, END)
    D->>D: "barrier = tuple(_inflight_starts) = (T1,)"
    D->>T2: "asyncio.create_task(_run_send, barrier=(T1,))"
    D->>D: _inflight.add(T2)
    par T1 runs concurrently with T2 barrier wait
        T1->>S: async with _send_sema
        S-->>T1: slot acquired
        T1->>EGP: PUT /spans/batch (START)
        EGP-->>T1: 200 OK
        T1->>S: release slot
        T1->>T1: task_done() x items
        T1->>D: done callback → _inflight.discard, _inflight_starts.discard
    and
        T2->>T2: "await gather(T1, return_exceptions=True)"
        note over T2: blocks until T1 completes
        T2->>S: async with _send_sema
        S-->>T2: slot acquired
        T2->>EGP: PUT /spans/batch (END)
        EGP-->>T2: 200 OK
        T2->>S: release slot
        T2->>T2: task_done() x items
        T2->>D: done callback → _inflight.discard
    end
Loading

Comments Outside Diff (1)

  1. src/agentex/lib/core/tracing/span_queue.py, line 343-345 (link)

    P2 The PR description calls out a known ordering hole when max_retries > 1: a retried START goes to the back of the queue and won't be in a concurrently-dispatched END's barrier snapshot. Since this latent bug activates the moment someone enables retries, a code-level note in _reenqueue would make this visible without relying on the PR description.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/core/tracing/span_queue.py
    Line: 343-345
    
    Comment:
    The PR description calls out a known ordering hole when `max_retries > 1`: a retried START goes to the back of the queue and won't be in a concurrently-dispatched END's barrier snapshot. Since this latent bug activates the moment someone enables retries, a code-level note in `_reenqueue` would make this visible without relying on the PR description.
    
    
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code Fix in Codex

Reviews (3): Last reviewed commit: "Merge branch 'next' into stas/tracing-co..." | Re-trigger Greptile

Span export was strictly serial: the drain loop awaited each upsert_batch
to completion before sending the next, so per-pod egress was capped at
~1/request-latency (~150ms server-side ⇒ ~6-7 PUT/s/pod) regardless of CPU
or backend headroom. Under load the queue backlogged and only drained
after the run.

The drain now dispatches each batch as its own task, bounded by
AGENTEX_SPAN_QUEUE_CONCURRENCY (default 8), so multiple upsert_batch
requests are in flight at once and a pod can keep up with span production.

The per-span START-before-END invariant is preserved: END send-tasks
snapshot the in-flight START tasks and await them before issuing. Since a
span's START is always enqueued (and thus dispatched) before its END, that
span's START send is either still in flight (waited on) or already done.
Independent spans export fully concurrently. Setting concurrency=1 restores
the old serial behavior.
Comment thread src/agentex/lib/core/tracing/span_queue.py
- Re-check backpressure before dispatching the END task so a batch
  carrying both event types can't push _inflight past the concurrency
  cap (the semaphore was already the hard limit; this tightens the
  in-flight task bound to match).
- Document the retry-ordering caveat directly in _reenqueue: a
  re-enqueued START goes to the back of the queue and may miss a
  concurrently-dispatched END's barrier snapshot when retries are
  enabled (benign at the default max_retries=1).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant