diff --git a/MODERNIZATION.md b/MODERNIZATION.md index 8456deaf..e922cc62 100644 --- a/MODERNIZATION.md +++ b/MODERNIZATION.md @@ -188,7 +188,26 @@ Android NDK r28+. `mMutex` and rtc's callback mutex in `close()`/`destroy()` (teardown deadlock; fixed by calling into rtc outside `mMutex`). Reproduced 3/200 resp. 2/100 locally before the fix; 0 failures in 925 stress runs - (Debug + Release) after. + (Debug + Release) after. **A third, rarer mechanism** surfaced on 2-core + Linux runners (ConnectionLockingTest.CanLockConnections, PR #42 CI, + 2026-07-04, twice in a row then pass; never reproduced in 100 stress runs + under 10x load on macOS): on the outgoing side the connector starts socket + I/O inside `createConnection()`, but the endpoint's listeners on the + returned `PendingConnection` are only registered afterwards + (`addEndpoint` -> `changeToConnectingState`). If the main thread is + descheduled in that window for longer than one localhost websocket + + streamr handshake round-trip — realistic only on a loaded 2-core box — + `onHandshakeCompleted()` runs first; it disarms the 15 s connect watchdog + and then emits `Connected` on a fire-and-forget emitter with zero + listeners. The emission is lost, no timeout remains armed, and the + endpoint sits in the connecting state forever with the lock-request RPC + buffered (signature: rpc FutureTimeout after 10 s, waitForCondition + timeout, `hasConnection() == false`). Fixed by making `IPendingConnection` + a `ReplayEventEmitter` (late listeners receive the latest + `Connected`/`Disconnected`) and by making `emit()`'s latest-event store + + handler snapshot atomic with respect to `on()` in the eventemitter, so a + concurrently registered listener gets exactly one of live dispatch or + replay. - **Gate**: build/test green macOS + Linux, **and iOS cross-build + `iostest.sh` green — the compiler's output must stay compatible with the device's fixed libc++ runtime**. diff --git a/packages/streamr-dht/include/streamr-dht/connection/IPendingConnection.hpp b/packages/streamr-dht/include/streamr-dht/connection/IPendingConnection.hpp index 6c0eb6c3..cb52e0c3 100644 --- a/packages/streamr-dht/include/streamr-dht/connection/IPendingConnection.hpp +++ b/packages/streamr-dht/include/streamr-dht/connection/IPendingConnection.hpp @@ -21,8 +21,18 @@ using PendingConnectionEvents = std::tuple< pendingconnectionevents::Connected, pendingconnectionevents::Disconnected>; -class IPendingConnection - : public streamr::eventemitter::EventEmitter { +// ReplayEventEmitter, not the fire-and-forget EventEmitter: the connectors +// start socket I/O inside createConnection(), but the Connected/Disconnected +// listeners are only registered afterwards, when ConnectionManager wraps the +// returned pending connection in an Endpoint (addEndpoint -> +// changeToConnectingState). On a loaded machine the entire websocket + +// streamr handshake can win that race, and onHandshakeCompleted() disarms +// the connect watchdog before emitting — with a fire-and-forget emitter the +// emission is lost and the endpoint stays in the connecting state forever +// (the CanLockConnections stall on 2-core CI runners). Replay delivers the +// missed emission to the late listener instead. +class IPendingConnection : public streamr::eventemitter::ReplayEventEmitter< + PendingConnectionEvents> { public: ~IPendingConnection() override = default; virtual void onHandshakeCompleted( diff --git a/packages/streamr-dht/test/unit/PendingConnectionTest.cpp b/packages/streamr-dht/test/unit/PendingConnectionTest.cpp index 1ff54073..d2d9f681 100644 --- a/packages/streamr-dht/test/unit/PendingConnectionTest.cpp +++ b/packages/streamr-dht/test/unit/PendingConnectionTest.cpp @@ -93,6 +93,35 @@ TEST_F(PendingConnectionTest, EmitsConnected) { pendingConnection->onHandshakeCompleted(dummyConnection); } +// Regression tests for the CanLockConnections stall on slow CI runners: +// the connectors start socket I/O before ConnectionManager registers the +// endpoint's listeners on the pending connection, so a fast handshake can +// complete before registration. IPendingConnection must therefore replay +// the latest emission to late listeners. + +TEST_F(PendingConnectionTest, ReplaysConnectedToLateListener) { + auto dummyConnection = std::make_shared(); + // handshake completes before anyone listens + pendingConnection->onHandshakeCompleted(dummyConnection); + bool isEmitted = false; + pendingConnection->once( + [&](const PeerDescriptor& /* peerDescriptor */, + std::shared_ptr connection) { // NOLINT + isEmitted = true; + EXPECT_EQ(dummyConnection, connection); + }); + EXPECT_TRUE(isEmitted); +} + +TEST_F(PendingConnectionTest, ReplaysDisconnectedToLateListener) { + // connection fails before anyone listens + pendingConnection->close(false); + bool isEmitted = false; + pendingConnection->once( + [&](bool /* gracefulLeave */) { isEmitted = true; }); + EXPECT_TRUE(isEmitted); +} + TEST_F(PendingConnectionTest, DoesNotEmitConnectedIfReplaced) { bool isEmitted = false; pendingConnection->once( diff --git a/packages/streamr-eventemitter/include/streamr-eventemitter/EventEmitter.hpp b/packages/streamr-eventemitter/include/streamr-eventemitter/EventEmitter.hpp index 968e0f87..05cdda43 100644 --- a/packages/streamr-eventemitter/include/streamr-eventemitter/EventEmitter.hpp +++ b/packages/streamr-eventemitter/include/streamr-eventemitter/EventEmitter.hpp @@ -287,13 +287,25 @@ class EventEmitterImpl { MatchingEventType EventType, typename... EventArgs> void emit(EventArgs... args) { - if constexpr (ReplayLatestEventToNewListeners) { - StoredEvent storedEvent( - (args)...); - mLatestEvent = std::move(storedEvent); - } std::lock_guard guard{mEmitLoopMutex}; - createExecutingEmitLoopHandlersMap(); + { + // The latest-event store and the handler snapshot must form one + // atomic step with respect to on(): on() checks mLatestEvent and + // registers the handler while holding mEventHandlersMutex, so a + // listener registered concurrently with this emit either makes + // it into the snapshot (live dispatch) or observes the stored + // event (replay) — never neither, never both. mEventHandlersMutex + // is only ever taken inside mEmitLoopMutex here, matching the + // once-handler removal below, so no new lock order is introduced. + std::lock_guard handlersGuard{mEventHandlersMutex}; + if constexpr (ReplayLatestEventToNewListeners) { + StoredEvent + storedEvent((args)...); + std::lock_guard latestGuard{mLatestEventMutex}; + mLatestEvent = std::move(storedEvent); + } + createExecutingEmitLoopHandlersMap(); + } while (auto ret = popHandlerFromExecutingEmitLoopHandlersMap()) { auto handler = ret.value();