Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 63 additions & 11 deletions src/bedrock_agentcore/runtime/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""

import asyncio
import contextlib
import contextvars
import functools
import inspect
Expand Down Expand Up @@ -725,29 +726,80 @@ def _async_gen_to_sync_gen(self, async_gen: Any, ctx: contextvars.Context) -> An
a thread-safe queue and yielded synchronously. Starlette's StreamingResponse
iterates this sync generator via iterate_in_threadpool, so the main event
loop is never blocked.

The producer must never block the shared worker loop, or a full queue
would freeze it and starve every other session. When the consumer stops
(SSE client disconnect), the producer task is cancelled so it is torn
down even while parked awaiting a slow downstream, and a stop event
keeps a queue-full producer from spinning before the cancel lands.
"""
worker_loop = self._ensure_worker_loop()
q: queue.Queue = queue.Queue(maxsize=100)
_DONE = object()
stop = threading.Event()
task_cell: dict[str, asyncio.Task] = {}

async def _produce() -> None:
_restore_context(ctx)
try:
async for chunk in async_gen:
q.put((True, chunk))
q.put((True, _DONE))
await _offer((True, chunk))
if stop.is_set():
return
await _offer((True, _DONE))
except asyncio.CancelledError:
# Consumer disconnected; nothing reads the queue anymore. Let the
# cancellation unwind so the finally below still closes the source.
raise
except BaseException as e:
q.put((False, e))
await _offer((False, e))
finally:
# Ensure the source generator releases its resources if the
# consumer disconnected mid-stream.
aclose = getattr(async_gen, "aclose", None)
if aclose is not None:
with contextlib.suppress(Exception):
await aclose()

async def _offer(item: Any) -> None:
"""Put an item without ever blocking the worker loop.

Retries with a short async sleep while the queue is full so the loop
stays free to run other coroutines. Bails out if the consumer stopped.
"""
while not stop.is_set():
try:
q.put_nowait(item)
return
except queue.Full:
await asyncio.sleep(0.01)

def _start() -> None:
task_cell["task"] = worker_loop.create_task(_produce())

worker_loop.call_soon_threadsafe(lambda: worker_loop.create_task(_produce()))
worker_loop.call_soon_threadsafe(_start)

while True:
ok, value = q.get()
if not ok:
raise value
if value is _DONE:
break
yield value
try:
while True:
ok, value = q.get()
if not ok:
raise value
if value is _DONE:
break
yield value
finally:
# Consumer is done or the client disconnected (GeneratorExit). Set the
# stop flag (unblocks a queue-full producer that hasn't started its
# await yet) and cancel the task so a producer parked in a mid-chunk
# await is torn down immediately instead of leaking until it returns.
stop.set()

def _cancel() -> None:
task = task_cell.get("task")
if task is not None and not task.done():
task.cancel()

worker_loop.call_soon_threadsafe(_cancel)

async def _invoke_handler(self, handler: Callable, request_context: Any, takes_context: bool, payload: Any) -> Any:
"""Dispatch handler execution based on handler type.
Expand Down
133 changes: 133 additions & 0 deletions tests/bedrock_agentcore/runtime/test_app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import concurrent.futures
import contextlib
import contextvars
import json
import os
import threading
Expand Down Expand Up @@ -3061,3 +3063,134 @@ def test_x_amzn_prefix_is_blocked(self):
def test_authorization_is_allowed(self):
# Authorization is normalized to canonical casing by the loop, but is_forwardable_header must not block it
assert is_forwardable_header("Authorization") is True


class TestStreamingBridgeDeadlock:
"""Regression tests for the async->sync streaming bridge deadlock.

When a streaming (async generator) handler produces faster than the SSE
consumer drains, the bounded queue used by ``_async_gen_to_sync_gen`` fills
up. The producer coroutine runs on the single shared worker loop, so a
blocking put on a full queue freezes that loop process-wide and starves
every other session's handler. If the consumer goes away entirely (client
disconnect), the producer must be torn down rather than left spinning.
"""

@staticmethod
def _abandon_after(sync_gen, n):
"""Consume ``n`` chunks from a bridged sync generator, then close it.

Mimics an SSE client that reads a bit and disconnects. Runs in a daemon
thread so a deadlocked ``q.get()`` can never hang the test thread.
"""
for _ in range(n):
next(sync_gen)
sync_gen.close() # StreamingResponse triggers GeneratorExit on client disconnect

def _run_bounded(self, target, timeout):
"""Run ``target`` in a daemon thread; return True iff it finished within ``timeout``."""
done = threading.Event()
error = {}

def _wrap():
try:
target()
except BaseException as e: # noqa: BLE001 - surface in assertion
error["e"] = e
finally:
done.set()

t = threading.Thread(target=_wrap, daemon=True)
t.start()
finished = done.wait(timeout)
if finished and "e" in error:
raise error["e"]
return finished

def test_worker_loop_survives_abandoned_stream(self):
"""A consumer that stops early and disconnects must not freeze the worker loop.

The producer emits far more than the bridge's queue
maxsize; the consumer reads a few chunks then closes (SSE disconnect).
Afterwards a fresh coroutine must still run on the shared worker loop.
"""
app = BedrockAgentCoreApp()

async def big_stream():
# Far beyond the bridge's bounded queue so a blocking producer wedges the loop.
for i in range(10_000):
yield f"chunk-{i}"

sync_gen = app._async_gen_to_sync_gen(big_stream(), contextvars.copy_context())
# Abandon the stream in a bounded thread (guards against a hang here too).
assert self._run_bounded(lambda: self._abandon_after(sync_gen, 3), timeout=10.0), (
"abandoning the stream hung — consumer blocked on the bridge"
)
time.sleep(0.5) # let the orphaned producer either exit or wedge

# The shared worker loop must still be able to run new work.
loop = app._ensure_worker_loop()

async def _noop():
return "alive"

fut = asyncio.run_coroutine_threadsafe(_noop(), loop)
try:
alive = fut.result(timeout=5.0) == "alive"
except (concurrent.futures.TimeoutError, TimeoutError):
fut.cancel()
alive = False
assert alive, "worker loop is wedged after an abandoned stream — bridge deadlocked"

def test_second_session_survives_first_sessions_abandoned_stream(self):
"""After session A abandons its stream, session B's streaming handler must still run."""
app = BedrockAgentCoreApp()

async def big_stream():
for i in range(10_000):
yield f"a-{i}"

gen_a = app._async_gen_to_sync_gen(big_stream(), contextvars.copy_context())
assert self._run_bounded(lambda: self._abandon_after(gen_a, 1), timeout=10.0), "abandoning session A hung"
time.sleep(0.5)

async def small_stream():
for i in range(3):
yield f"b-{i}"

gen_b = app._async_gen_to_sync_gen(small_stream(), contextvars.copy_context())
collected = []
# Consume B in a bounded thread: if the loop is starved, q.get() blocks forever
# and this returns False rather than hanging the suite.
completed = self._run_bounded(lambda: collected.extend(list(gen_b)), timeout=5.0)
assert completed and collected == ["b-0", "b-1", "b-2"], (
f"session B did not complete — worker loop starved by session A; completed={completed} got={collected}"
)

def test_disconnect_tears_down_producer_blocked_on_slow_downstream(self):
"""Disconnecting while the handler is parked mid-await must cancel the producer.

The source generator yields one chunk then blocks on a long await (a slow
downstream). On client disconnect the producer must be cancelled so the
generator's ``finally`` runs and its resources are released — otherwise
the task and the upstream connection leak until the await returns.
"""
app = BedrockAgentCoreApp()
cleaned_up = threading.Event()

async def stalled_stream():
try:
yield "first"
await asyncio.sleep(3600) # park on a slow/hung downstream
yield "never"
finally:
cleaned_up.set() # runs only if the coroutine is closed/cancelled

sync_gen = app._async_gen_to_sync_gen(stalled_stream(), contextvars.copy_context())
# Read the first chunk, then disconnect while the producer is parked in the await.
assert self._run_bounded(lambda: self._abandon_after(sync_gen, 1), timeout=10.0), "abandoning hung"

assert cleaned_up.wait(timeout=5.0), (
"producer parked in a mid-await was not torn down on disconnect — "
"source generator's finally never ran (task leaked)"
)
99 changes: 99 additions & 0 deletions tests/integration/runtime/test_streaming_disconnect_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Integration test for the streaming-bridge deadlock.

Boots a real uvicorn server running a BedrockAgentCoreApp with a streaming
(async generator) entrypoint, then drives it with real HTTP requests:

1. Session A opens the SSE stream, reads a few chunks, then DISCONNECTS.
2. Session B (fresh) must still receive a normal response.

Before the fix, A's disconnect fills the bridge's bounded queue and blocks the
shared worker loop, so B hangs and its handler never runs. After the fix, the
orphaned producer is torn down and B succeeds.

No mocks: full ASGI stack over a loopback socket.
"""

import socket
import threading
import time

import httpx
import pytest
import uvicorn

from bedrock_agentcore.runtime import BedrockAgentCoreApp


def _free_port():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", 0))
port = s.getsockname()[1]
s.close()
return port


@pytest.fixture()
def live_server():
"""Run a streaming BedrockAgentCoreApp on a real uvicorn server in a thread."""
app = BedrockAgentCoreApp()

@app.entrypoint
async def handler(payload):
n = int(payload.get("chunks", 5000))
for i in range(n):
yield f"data: chunk {i}\n\n"

port = _free_port()
config = uvicorn.Config(app, host="127.0.0.1", port=port, log_level="warning")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()

# Wait for the server to accept connections.
base = f"http://127.0.0.1:{port}"
deadline = time.time() + 10
while time.time() < deadline:
try:
httpx.get(f"{base}/ping", timeout=1.0)
break
except Exception:
time.sleep(0.1)
else:
server.should_exit = True
pytest.fail("live server did not start")

yield base

server.should_exit = True
thread.join(timeout=5)


def test_ping_stays_healthy_and_fresh_session_works_after_disconnect(live_server):
"""After a mid-stream client disconnect: /ping stays healthy AND a fresh session responds."""
base = live_server

# Session A: open stream, read a few chunks, then disconnect by leaving the context.
read = 0
with httpx.Client(timeout=30.0) as c:
with c.stream("POST", f"{base}/invocations", json={"sessionId": "A", "chunks": 5000}) as r:
for _ in r.iter_lines():
read += 1
if read >= 3:
break
assert read >= 3

time.sleep(2) # let the (now-orphaned) producer fill the queue if it's going to block

# The main loop must still answer health checks (it always did — proves the split).
ping = httpx.get(f"{base}/ping", timeout=5.0)
assert ping.status_code == 200

# The worker loop must still serve a fresh session. Before the fix this times out.
resp = httpx.post(
f"{base}/invocations",
json={"sessionId": "B", "chunks": 3},
headers={"Accept": "text/event-stream"},
timeout=15.0,
)
assert resp.status_code == 200
assert "chunk 0" in resp.text
Loading