From c560f5a957bcdd618b70dbfc4e181ec637986dfa Mon Sep 17 00:00:00 2001 From: Raul Gomez Rodriguez Date: Wed, 10 Jun 2026 15:23:58 -0700 Subject: [PATCH] WebSocket: route internal connect completion to the Work port WebSocket::ConnectAsyncProvider and NetworkState::WebSocketConnectAsyncProvider built the internal connect async block's queue with XTaskQueueDuplicateHandle(callerQueue). That places the internal connect completion (WebSocket::ConnectComplete / NetworkState::WebSocketConnectComplete) on the caller's Completion port. WebSocket::ConnectComplete is what transitions the socket to State::Connected and allocates the provider context. Routing it through the Completion port couples connect *progress* to dispatch of the Completion port. On a task queue whose Work and Completion ports are dispatched asymmetrically by different threads -- a common pattern where background work is pumped continuously but completions are pumped from a main loop -- the connect completion can be stranded indefinitely whenever the Completion port is not currently being dispatched. The socket stays stuck in State::Connecting and every operation chained behind the connect stalls (e.g. sends fail with E_UNEXPECTED because the provider context is never allocated). Fix: build the internal connect async block's queue as a composite whose Work AND Completion ports both map to the caller's Work port, so ConnectComplete runs on the Work port as soon as the connect finishes, independent of how the caller dispatches the Completion port. This restores the long-standing pre-regression behavior. The final client completion is unaffected: it is still delivered on the caller's real Completion port via the outer (client) async block. In-process transports that run their own connect threads are unaffected, since the choice only governs where the lightweight completion bookkeeping runs. Adds VerifyWebSocketConnectCompletesOnWorkPortWithoutCompletionPump, which reproduces the asymmetric-dispatch topology with a Manual/Manual queue and asserts that pumping only the Work port drives the connect to completion (a subsequent send is accepted) while the client completion callback is still correctly deferred to the Completion port. --- Source/Global/NetworkState.cpp | 10 ++- Source/WebSocket/hcwebsocket.cpp | 9 ++- Tests/UnitTests/Tests/WebsocketTests.cpp | 77 ++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 6 deletions(-) diff --git a/Source/Global/NetworkState.cpp b/Source/Global/NetworkState.cpp index f09bc707..04872f62 100644 --- a/Source/Global/NetworkState.cpp +++ b/Source/Global/NetworkState.cpp @@ -361,9 +361,13 @@ HRESULT CALLBACK NetworkState::WebSocketConnectAsyncProvider(XAsyncOp op, const case XAsyncOp::Begin: { assert(data->async->queue); // Queue should never be null here - RETURN_IF_FAILED(XTaskQueueDuplicateHandle( - data->async->queue, - &context->internalAsyncBlock.queue)); + + // Run the internal connect completion on the caller's Work port so it is not stranded when the + // Completion port is dispatched asymmetrically. The client completion still fires on the + // Completion port via the outer async block. + XTaskQueuePortHandle workPort{ nullptr }; + RETURN_IF_FAILED(XTaskQueueGetPort(data->async->queue, XTaskQueuePort::Work, &workPort)); + RETURN_IF_FAILED(XTaskQueueCreateComposite(workPort, workPort, &context->internalAsyncBlock.queue)); std::unique_lock lock{ state.m_mutex }; state.m_connectingWebSockets.insert(context->clientAsyncBlock); diff --git a/Source/WebSocket/hcwebsocket.cpp b/Source/WebSocket/hcwebsocket.cpp index af331967..7511934d 100644 --- a/Source/WebSocket/hcwebsocket.cpp +++ b/Source/WebSocket/hcwebsocket.cpp @@ -225,9 +225,12 @@ HRESULT CALLBACK WebSocket::ConnectAsyncProvider(XAsyncOp op, XAsyncProviderData RETURN_HR_IF(E_UNEXPECTED, ws->m_state != State::Initial); ws->ClearResponseHeadersLockHeld(); - RETURN_IF_FAILED(XTaskQueueDuplicateHandle( - data->async->queue, - &context->internalAsyncBlock.queue)); + // Run the internal connect completion on the caller's Work port so it is not stranded when the + // Completion port is dispatched asymmetrically. The client completion still fires on the + // Completion port via the outer async block. + XTaskQueuePortHandle workPort{ nullptr }; + RETURN_IF_FAILED(XTaskQueueGetPort(data->async->queue, XTaskQueuePort::Work, &workPort)); + RETURN_IF_FAILED(XTaskQueueCreateComposite(workPort, workPort, &context->internalAsyncBlock.queue)); ws->m_state = State::Connecting; lock.unlock(); diff --git a/Tests/UnitTests/Tests/WebsocketTests.cpp b/Tests/UnitTests/Tests/WebsocketTests.cpp index 33f5a8d3..54db3d05 100644 --- a/Tests/UnitTests/Tests/WebsocketTests.cpp +++ b/Tests/UnitTests/Tests/WebsocketTests.cpp @@ -1164,6 +1164,83 @@ DEFINE_TEST_CLASS(WebsocketTests) HCCleanup(); } + // Regression guard: connect must complete on the Work port without the Completion port being + // dispatched, while the client completion callback is still deferred to the Completion port. + DEFINE_TEST_CASE(VerifyWebSocketConnectCompletesOnWorkPortWithoutCompletionPump) + { + VERIFY_ARE_EQUAL(S_OK, HCSetWebSocketFunctions( + Test_Internal_HCWebSocketConnectAsync, + Test_Internal_HCWebSocketSendMessageAsync, + Test_Internal_HCWebSocketSendBinaryMessageAsync, + Test_Internal_HCWebSocketDisconnect, + nullptr)); + VERIFY_ARE_EQUAL(S_OK, HCInitialize(nullptr)); + + // Manual / Manual queue: the caller controls when each port is dispatched. + XTaskQueueHandle queue{ nullptr }; + VERIFY_ARE_EQUAL(S_OK, XTaskQueueCreate(XTaskQueueDispatchMode::Manual, XTaskQueueDispatchMode::Manual, &queue)); + + g_HCWebSocketConnect_Called = false; + g_HCWebSocketSendMessage_Called = false; + + HCWebsocketHandle websocket{ nullptr }; + VERIFY_ARE_EQUAL(S_OK, HCWebSocketCreate(&websocket, nullptr, nullptr, nullptr, nullptr)); + VERIFY_IS_NOT_NULL(websocket); + + std::atomic clientCompleted{ false }; + struct ConnectState { std::atomic* completed; } connectState{ &clientCompleted }; + + XAsyncBlock asyncBlock{}; + asyncBlock.queue = queue; + asyncBlock.context = &connectState; + asyncBlock.callback = [](XAsyncBlock* async) + { + auto state = static_cast(async->context); + state->completed->store(true); + }; + + VERIFY_ARE_EQUAL(S_OK, HCWebSocketConnectAsync("test", "subProtoTest", websocket, &asyncBlock)); + + // Pump ONLY the Work port: the internal connect completion runs here, reaching Connected. + bool workDispatched = true; + while (workDispatched) + { + workDispatched = XTaskQueueDispatch(queue, XTaskQueuePort::Work, 0); + } + + VERIFY_IS_TRUE(g_HCWebSocketConnect_Called); + + // A send is accepted only once the socket is Connected (SendAsync returns E_UNEXPECTED before + // the connect completion allocates the provider context), so this probes that the connect + // completion was not stranded on the unpumped Completion port. + XAsyncBlock probeSendBlock{}; + probeSendBlock.queue = queue; + VERIFY_ARE_EQUAL(S_OK, HCWebSocketSendMessageAsync(websocket, "probe", &probeSendBlock)); + + // The client completion is deferred to the Completion port, so it has not fired yet. + VERIFY_IS_FALSE(clientCompleted.load()); + + // Drain both ports to deliver the client and send completions. + bool dispatched = true; + while (dispatched) + { + bool work = XTaskQueueDispatch(queue, XTaskQueuePort::Work, 0); + bool completion = XTaskQueueDispatch(queue, XTaskQueuePort::Completion, 0); + dispatched = work || completion; + } + + VERIFY_IS_TRUE(clientCompleted.load()); + VERIFY_IS_TRUE(g_HCWebSocketSendMessage_Called); + + WebSocketCompletionResult connectResult{}; + VERIFY_ARE_EQUAL(S_OK, HCGetWebSocketConnectResult(&asyncBlock, &connectResult)); + VERIFY_ARE_EQUAL(S_OK, connectResult.errorCode); + + VERIFY_ARE_EQUAL(S_OK, HCWebSocketCloseHandle(websocket)); + XTaskQueueCloseHandle(queue); + HCCleanup(); + } + }; NAMESPACE_XBOX_HTTP_CLIENT_TEST_END