Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion MODERNIZATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,26 @@ Android NDK r28+.
`mMutex` and rtc's callback mutex in `close()`/`destroy()` (teardown
deadlock; fixed by calling into rtc outside `mMutex`). Reproduced 3/200
resp. 2/100 locally before the fix; 0 failures in 925 stress runs
(Debug + Release) after.
(Debug + Release) after. **A third, rarer mechanism** surfaced on 2-core
Linux runners (ConnectionLockingTest.CanLockConnections, PR #42 CI,
2026-07-04, twice in a row then pass; never reproduced in 100 stress runs
under 10x load on macOS): on the outgoing side the connector starts socket
I/O inside `createConnection()`, but the endpoint's listeners on the
returned `PendingConnection` are only registered afterwards
(`addEndpoint` -> `changeToConnectingState`). If the main thread is
descheduled in that window for longer than one localhost websocket +
streamr handshake round-trip — realistic only on a loaded 2-core box —
`onHandshakeCompleted()` runs first; it disarms the 15 s connect watchdog
and then emits `Connected` on a fire-and-forget emitter with zero
listeners. The emission is lost, no timeout remains armed, and the
endpoint sits in the connecting state forever with the lock-request RPC
buffered (signature: rpc FutureTimeout after 10 s, waitForCondition
timeout, `hasConnection() == false`). Fixed by making `IPendingConnection`
a `ReplayEventEmitter` (late listeners receive the latest
`Connected`/`Disconnected`) and by making `emit()`'s latest-event store +
handler snapshot atomic with respect to `on()` in the eventemitter, so a
concurrently registered listener gets exactly one of live dispatch or
replay.
- **Gate**: build/test green macOS + Linux, **and iOS cross-build +
`iostest.sh` green — the compiler's output must stay compatible with the
device's fixed libc++ runtime**.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,18 @@ using PendingConnectionEvents = std::tuple<
pendingconnectionevents::Connected,
pendingconnectionevents::Disconnected>;

class IPendingConnection
: public streamr::eventemitter::EventEmitter<PendingConnectionEvents> {
// ReplayEventEmitter, not the fire-and-forget EventEmitter: the connectors
// start socket I/O inside createConnection(), but the Connected/Disconnected
// listeners are only registered afterwards, when ConnectionManager wraps the
// returned pending connection in an Endpoint (addEndpoint ->
// changeToConnectingState). On a loaded machine the entire websocket +
// streamr handshake can win that race, and onHandshakeCompleted() disarms
// the connect watchdog before emitting — with a fire-and-forget emitter the
// emission is lost and the endpoint stays in the connecting state forever
// (the CanLockConnections stall on 2-core CI runners). Replay delivers the
// missed emission to the late listener instead.
class IPendingConnection : public streamr::eventemitter::ReplayEventEmitter<
PendingConnectionEvents> {
public:
~IPendingConnection() override = default;
virtual void onHandshakeCompleted(
Expand Down
29 changes: 29 additions & 0 deletions packages/streamr-dht/test/unit/PendingConnectionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,35 @@ TEST_F(PendingConnectionTest, EmitsConnected) {
pendingConnection->onHandshakeCompleted(dummyConnection);
}

// Regression tests for the CanLockConnections stall on slow CI runners:
// the connectors start socket I/O before ConnectionManager registers the
// endpoint's listeners on the pending connection, so a fast handshake can
// complete before registration. IPendingConnection must therefore replay
// the latest emission to late listeners.

TEST_F(PendingConnectionTest, ReplaysConnectedToLateListener) {
auto dummyConnection = std::make_shared<DummyConnection>();
// handshake completes before anyone listens
pendingConnection->onHandshakeCompleted(dummyConnection);
bool isEmitted = false;
pendingConnection->once<Connected>(
[&](const PeerDescriptor& /* peerDescriptor */,
std::shared_ptr<Connection> connection) { // NOLINT
isEmitted = true;
EXPECT_EQ(dummyConnection, connection);
});
EXPECT_TRUE(isEmitted);
}

TEST_F(PendingConnectionTest, ReplaysDisconnectedToLateListener) {
// connection fails before anyone listens
pendingConnection->close(false);
bool isEmitted = false;
pendingConnection->once<Disconnected>(
[&](bool /* gracefulLeave */) { isEmitted = true; });
EXPECT_TRUE(isEmitted);
}

TEST_F(PendingConnectionTest, DoesNotEmitConnectedIfReplaced) {
bool isEmitted = false;
pendingConnection->once<Connected>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,25 @@ class EventEmitterImpl {
MatchingEventType<EmitterEventType> EventType,
typename... EventArgs>
void emit(EventArgs... args) {
if constexpr (ReplayLatestEventToNewListeners) {
StoredEvent<typename EmitterEventType::ArgumentTypes> storedEvent(
(args)...);
mLatestEvent = std::move(storedEvent);
}
std::lock_guard guard{mEmitLoopMutex};
createExecutingEmitLoopHandlersMap();
{
// The latest-event store and the handler snapshot must form one
// atomic step with respect to on(): on() checks mLatestEvent and
// registers the handler while holding mEventHandlersMutex, so a
// listener registered concurrently with this emit either makes
// it into the snapshot (live dispatch) or observes the stored
// event (replay) — never neither, never both. mEventHandlersMutex
// is only ever taken inside mEmitLoopMutex here, matching the
// once-handler removal below, so no new lock order is introduced.
std::lock_guard handlersGuard{mEventHandlersMutex};
if constexpr (ReplayLatestEventToNewListeners) {
StoredEvent<typename EmitterEventType::ArgumentTypes>
storedEvent((args)...);
std::lock_guard latestGuard{mLatestEventMutex};
mLatestEvent = std::move(storedEvent);
}
createExecutingEmitLoopHandlersMap();
}

while (auto ret = popHandlerFromExecutingEmitLoopHandlersMap()) {
auto handler = ret.value();
Expand Down
Loading