diff --git a/src/bedrock_agentcore/runtime/app.py b/src/bedrock_agentcore/runtime/app.py index 403f12da..f2e596be 100644 --- a/src/bedrock_agentcore/runtime/app.py +++ b/src/bedrock_agentcore/runtime/app.py @@ -4,6 +4,7 @@ """ import asyncio +import contextlib import contextvars import functools import inspect @@ -725,29 +726,80 @@ def _async_gen_to_sync_gen(self, async_gen: Any, ctx: contextvars.Context) -> An a thread-safe queue and yielded synchronously. Starlette's StreamingResponse iterates this sync generator via iterate_in_threadpool, so the main event loop is never blocked. + + The producer must never block the shared worker loop, or a full queue + would freeze it and starve every other session. When the consumer stops + (SSE client disconnect), the producer task is cancelled so it is torn + down even while parked awaiting a slow downstream, and a stop event + keeps a queue-full producer from spinning before the cancel lands. """ worker_loop = self._ensure_worker_loop() q: queue.Queue = queue.Queue(maxsize=100) _DONE = object() + stop = threading.Event() + task_cell: dict[str, asyncio.Task] = {} async def _produce() -> None: _restore_context(ctx) try: async for chunk in async_gen: - q.put((True, chunk)) - q.put((True, _DONE)) + await _offer((True, chunk)) + if stop.is_set(): + return + await _offer((True, _DONE)) + except asyncio.CancelledError: + # Consumer disconnected; nothing reads the queue anymore. Let the + # cancellation unwind so the finally below still closes the source. + raise except BaseException as e: - q.put((False, e)) + await _offer((False, e)) + finally: + # Ensure the source generator releases its resources if the + # consumer disconnected mid-stream. + aclose = getattr(async_gen, "aclose", None) + if aclose is not None: + with contextlib.suppress(Exception): + await aclose() + + async def _offer(item: Any) -> None: + """Put an item without ever blocking the worker loop. + + Retries with a short async sleep while the queue is full so the loop + stays free to run other coroutines. Bails out if the consumer stopped. + """ + while not stop.is_set(): + try: + q.put_nowait(item) + return + except queue.Full: + await asyncio.sleep(0.01) + + def _start() -> None: + task_cell["task"] = worker_loop.create_task(_produce()) - worker_loop.call_soon_threadsafe(lambda: worker_loop.create_task(_produce())) + worker_loop.call_soon_threadsafe(_start) - while True: - ok, value = q.get() - if not ok: - raise value - if value is _DONE: - break - yield value + try: + while True: + ok, value = q.get() + if not ok: + raise value + if value is _DONE: + break + yield value + finally: + # Consumer is done or the client disconnected (GeneratorExit). Set the + # stop flag (unblocks a queue-full producer that hasn't started its + # await yet) and cancel the task so a producer parked in a mid-chunk + # await is torn down immediately instead of leaking until it returns. + stop.set() + + def _cancel() -> None: + task = task_cell.get("task") + if task is not None and not task.done(): + task.cancel() + + worker_loop.call_soon_threadsafe(_cancel) async def _invoke_handler(self, handler: Callable, request_context: Any, takes_context: bool, payload: Any) -> Any: """Dispatch handler execution based on handler type. diff --git a/tests/bedrock_agentcore/runtime/test_app.py b/tests/bedrock_agentcore/runtime/test_app.py index bba49b25..edc0ff6a 100644 --- a/tests/bedrock_agentcore/runtime/test_app.py +++ b/tests/bedrock_agentcore/runtime/test_app.py @@ -1,5 +1,7 @@ import asyncio +import concurrent.futures import contextlib +import contextvars import json import os import threading @@ -3061,3 +3063,134 @@ def test_x_amzn_prefix_is_blocked(self): def test_authorization_is_allowed(self): # Authorization is normalized to canonical casing by the loop, but is_forwardable_header must not block it assert is_forwardable_header("Authorization") is True + + +class TestStreamingBridgeDeadlock: + """Regression tests for the async->sync streaming bridge deadlock. + + When a streaming (async generator) handler produces faster than the SSE + consumer drains, the bounded queue used by ``_async_gen_to_sync_gen`` fills + up. The producer coroutine runs on the single shared worker loop, so a + blocking put on a full queue freezes that loop process-wide and starves + every other session's handler. If the consumer goes away entirely (client + disconnect), the producer must be torn down rather than left spinning. + """ + + @staticmethod + def _abandon_after(sync_gen, n): + """Consume ``n`` chunks from a bridged sync generator, then close it. + + Mimics an SSE client that reads a bit and disconnects. Runs in a daemon + thread so a deadlocked ``q.get()`` can never hang the test thread. + """ + for _ in range(n): + next(sync_gen) + sync_gen.close() # StreamingResponse triggers GeneratorExit on client disconnect + + def _run_bounded(self, target, timeout): + """Run ``target`` in a daemon thread; return True iff it finished within ``timeout``.""" + done = threading.Event() + error = {} + + def _wrap(): + try: + target() + except BaseException as e: # noqa: BLE001 - surface in assertion + error["e"] = e + finally: + done.set() + + t = threading.Thread(target=_wrap, daemon=True) + t.start() + finished = done.wait(timeout) + if finished and "e" in error: + raise error["e"] + return finished + + def test_worker_loop_survives_abandoned_stream(self): + """A consumer that stops early and disconnects must not freeze the worker loop. + + The producer emits far more than the bridge's queue + maxsize; the consumer reads a few chunks then closes (SSE disconnect). + Afterwards a fresh coroutine must still run on the shared worker loop. + """ + app = BedrockAgentCoreApp() + + async def big_stream(): + # Far beyond the bridge's bounded queue so a blocking producer wedges the loop. + for i in range(10_000): + yield f"chunk-{i}" + + sync_gen = app._async_gen_to_sync_gen(big_stream(), contextvars.copy_context()) + # Abandon the stream in a bounded thread (guards against a hang here too). + assert self._run_bounded(lambda: self._abandon_after(sync_gen, 3), timeout=10.0), ( + "abandoning the stream hung — consumer blocked on the bridge" + ) + time.sleep(0.5) # let the orphaned producer either exit or wedge + + # The shared worker loop must still be able to run new work. + loop = app._ensure_worker_loop() + + async def _noop(): + return "alive" + + fut = asyncio.run_coroutine_threadsafe(_noop(), loop) + try: + alive = fut.result(timeout=5.0) == "alive" + except (concurrent.futures.TimeoutError, TimeoutError): + fut.cancel() + alive = False + assert alive, "worker loop is wedged after an abandoned stream — bridge deadlocked" + + def test_second_session_survives_first_sessions_abandoned_stream(self): + """After session A abandons its stream, session B's streaming handler must still run.""" + app = BedrockAgentCoreApp() + + async def big_stream(): + for i in range(10_000): + yield f"a-{i}" + + gen_a = app._async_gen_to_sync_gen(big_stream(), contextvars.copy_context()) + assert self._run_bounded(lambda: self._abandon_after(gen_a, 1), timeout=10.0), "abandoning session A hung" + time.sleep(0.5) + + async def small_stream(): + for i in range(3): + yield f"b-{i}" + + gen_b = app._async_gen_to_sync_gen(small_stream(), contextvars.copy_context()) + collected = [] + # Consume B in a bounded thread: if the loop is starved, q.get() blocks forever + # and this returns False rather than hanging the suite. + completed = self._run_bounded(lambda: collected.extend(list(gen_b)), timeout=5.0) + assert completed and collected == ["b-0", "b-1", "b-2"], ( + f"session B did not complete — worker loop starved by session A; completed={completed} got={collected}" + ) + + def test_disconnect_tears_down_producer_blocked_on_slow_downstream(self): + """Disconnecting while the handler is parked mid-await must cancel the producer. + + The source generator yields one chunk then blocks on a long await (a slow + downstream). On client disconnect the producer must be cancelled so the + generator's ``finally`` runs and its resources are released — otherwise + the task and the upstream connection leak until the await returns. + """ + app = BedrockAgentCoreApp() + cleaned_up = threading.Event() + + async def stalled_stream(): + try: + yield "first" + await asyncio.sleep(3600) # park on a slow/hung downstream + yield "never" + finally: + cleaned_up.set() # runs only if the coroutine is closed/cancelled + + sync_gen = app._async_gen_to_sync_gen(stalled_stream(), contextvars.copy_context()) + # Read the first chunk, then disconnect while the producer is parked in the await. + assert self._run_bounded(lambda: self._abandon_after(sync_gen, 1), timeout=10.0), "abandoning hung" + + assert cleaned_up.wait(timeout=5.0), ( + "producer parked in a mid-await was not torn down on disconnect — " + "source generator's finally never ran (task leaked)" + ) diff --git a/tests/integration/runtime/test_streaming_disconnect_integration.py b/tests/integration/runtime/test_streaming_disconnect_integration.py new file mode 100644 index 00000000..346baf23 --- /dev/null +++ b/tests/integration/runtime/test_streaming_disconnect_integration.py @@ -0,0 +1,99 @@ +"""Integration test for the streaming-bridge deadlock. + +Boots a real uvicorn server running a BedrockAgentCoreApp with a streaming +(async generator) entrypoint, then drives it with real HTTP requests: + + 1. Session A opens the SSE stream, reads a few chunks, then DISCONNECTS. + 2. Session B (fresh) must still receive a normal response. + +Before the fix, A's disconnect fills the bridge's bounded queue and blocks the +shared worker loop, so B hangs and its handler never runs. After the fix, the +orphaned producer is torn down and B succeeds. + +No mocks: full ASGI stack over a loopback socket. +""" + +import socket +import threading +import time + +import httpx +import pytest +import uvicorn + +from bedrock_agentcore.runtime import BedrockAgentCoreApp + + +def _free_port(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.close() + return port + + +@pytest.fixture() +def live_server(): + """Run a streaming BedrockAgentCoreApp on a real uvicorn server in a thread.""" + app = BedrockAgentCoreApp() + + @app.entrypoint + async def handler(payload): + n = int(payload.get("chunks", 5000)) + for i in range(n): + yield f"data: chunk {i}\n\n" + + port = _free_port() + config = uvicorn.Config(app, host="127.0.0.1", port=port, log_level="warning") + server = uvicorn.Server(config) + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + + # Wait for the server to accept connections. + base = f"http://127.0.0.1:{port}" + deadline = time.time() + 10 + while time.time() < deadline: + try: + httpx.get(f"{base}/ping", timeout=1.0) + break + except Exception: + time.sleep(0.1) + else: + server.should_exit = True + pytest.fail("live server did not start") + + yield base + + server.should_exit = True + thread.join(timeout=5) + + +def test_ping_stays_healthy_and_fresh_session_works_after_disconnect(live_server): + """After a mid-stream client disconnect: /ping stays healthy AND a fresh session responds.""" + base = live_server + + # Session A: open stream, read a few chunks, then disconnect by leaving the context. + read = 0 + with httpx.Client(timeout=30.0) as c: + with c.stream("POST", f"{base}/invocations", json={"sessionId": "A", "chunks": 5000}) as r: + for _ in r.iter_lines(): + read += 1 + if read >= 3: + break + assert read >= 3 + + time.sleep(2) # let the (now-orphaned) producer fill the queue if it's going to block + + # The main loop must still answer health checks (it always did — proves the split). + ping = httpx.get(f"{base}/ping", timeout=5.0) + assert ping.status_code == 200 + + # The worker loop must still serve a fresh session. Before the fix this times out. + resp = httpx.post( + f"{base}/invocations", + json={"sessionId": "B", "chunks": 3}, + headers={"Accept": "text/event-stream"}, + timeout=15.0, + ) + assert resp.status_code == 200 + assert "chunk 0" in resp.text