From a09cd0b752520f6e37056aaa6e6b155a617c34bd Mon Sep 17 00:00:00 2001 From: Aleksandr Platonenkov Date: Fri, 5 Jun 2026 21:01:53 -0300 Subject: [PATCH] fix: thread-safe RequestManager and surface stream-handler errors - RequestManager: drop shared mutable nextId; generate a Guid per call and register via a single atomic ConcurrentDictionary.TryAdd, fixing the race that threw "Response with id '$' is already pending" (or dropped a pending promise) under concurrent requests on one connection - connection: surface exceptions thrown by stream handlers (OnLedgerClosed, OnTransaction, etc.) via the OnError event instead of swallowing them into a debug trace; the message loop stays alive and a throwing OnError handler is contained - docs: clarify Xrpl.Client.Exceptions.TimeoutException is not System.TimeoutException to avoid mismatched catch clauses - tests: add RequestManager concurrency stress tests and a stream-handler error-propagation test - bump Xrpl package version to 10.4.2.0 --- CHANGES.md | 5 + Tests/Xrpl.Tests/Client/TestSubscribe.cs | 22 ++++ .../Client/TestURequestManagerConcurrency.cs | 109 ++++++++++++++++++ Xrpl/Client/Exceptions/XrplException.cs | 10 +- Xrpl/Client/RequestManager.cs | 44 ++----- Xrpl/Client/connection.cs | 38 +++++- Xrpl/Xrpl.csproj | 2 +- 7 files changed, 192 insertions(+), 38 deletions(-) create mode 100644 Tests/Xrpl.Tests/Client/TestURequestManagerConcurrency.cs diff --git a/CHANGES.md b/CHANGES.md index 4f9546ed..d4bda5f0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,10 @@ # Changes +### 10.4.2.0 06/05/2026 +* Fix thread-unsafe request id assignment in `RequestManager` — concurrent requests on a single connection (e.g. `Task.WhenAll` over several `BookOffers`) could collide on the same id and throw `Response with id '$' is already pending` or drop a pending promise. Removed the shared `nextId` field; each call now generates its own `Guid` and registers via a single atomic `ConcurrentDictionary.TryAdd`, enabling parallel requests on one connection +* Surface exceptions thrown by stream handlers (`OnLedgerClosed`, `OnTransaction`, etc.) through the `OnError` event instead of swallowing them into a debug trace — consumer bugs are now observable, while the message loop stays alive and a throwing `OnError` handler is contained +* Clarify in XML docs that `Xrpl.Client.Exceptions.TimeoutException` is not `System.TimeoutException` (it derives from `XrplException`), to avoid mismatched `catch` clauses + ### 10.4.1.0 05/28/2026 * Fix `IouValue` (IOU token amount) parsing to accept a trailing decimal point (e.g. `"128700."`), aligning with `xrpl.js` / `ripple-binary-codec` and `rippled` `STAmount` reference behavior — previously the stricter validation regex rejected a value with no digits after the dot, breaking signing of transactions (e.g. `AMMDeposit` via WalletConnect) that carried such amounts * Relax IOU value regex fractional group from `(\.(\d+))?` to `(\.(\d*))?` while adding a `(?=\.?\d)` lookahead that still requires at least one mantissa digit — so trailing/leading dots (`"128700."`, `".5"`) parse but bare-dot inputs (`"."`, `".e10"`) are rejected, matching BigNumber; deduplicate the regex by reusing the single `IouValue.ValueRegex` constant in `AmountValue.cs` and `ExtenstionHelpers.cs` diff --git a/Tests/Xrpl.Tests/Client/TestSubscribe.cs b/Tests/Xrpl.Tests/Client/TestSubscribe.cs index 9aefde8d..edef46fd 100644 --- a/Tests/Xrpl.Tests/Client/TestSubscribe.cs +++ b/Tests/Xrpl.Tests/Client/TestSubscribe.cs @@ -257,6 +257,28 @@ public async Task TestEmitsBookChanges() Assert.AreEqual("LESS", change.AssetB.ToString()); } + [TestMethod] + public async Task StreamHandlerException_IsSurfacedViaOnError() + { + TaskCompletionSource errorReported = new TaskCompletionSource(); + + runner.client.connection.OnLedgerClosed += _ => + throw new InvalidOperationException("consumer handler bug"); + + runner.client.connection.OnError += (error, errorMessage, message, data) => + { + errorReported.TrySetResult(errorMessage); + return Task.CompletedTask; + }; + + string jsonString = "{\"fee_base\":10,\"fee_ref\":10,\"ledger_hash\":\"B3980C722D71873D6708723E71B7A28C826BC66C58712ADCEC61603415305CD1\",\"ledger_index\":66093872,\"ledger_time\":683942720,\"reserve_base\":20000000,\"reserve_inc\":5000000,\"txn_count\":70,\"type\":\"ledgerClosed\",\"validated_ledgers\":\"65201743-66093872\"}"; + await runner.client.connection.OnMessage(jsonString); + + Task completed = await Task.WhenAny(errorReported.Task, Task.Delay(5000)); + Assert.AreEqual(errorReported.Task, completed, + "Exception thrown by a stream handler was swallowed instead of surfaced via OnError"); + } + [TestMethod] public async Task TestEmitsServerStatus() { diff --git a/Tests/Xrpl.Tests/Client/TestURequestManagerConcurrency.cs b/Tests/Xrpl.Tests/Client/TestURequestManagerConcurrency.cs new file mode 100644 index 00000000..3c2c3c40 --- /dev/null +++ b/Tests/Xrpl.Tests/Client/TestURequestManagerConcurrency.cs @@ -0,0 +1,109 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +using Microsoft.VisualStudio.TestTools.UnitTesting; + +using Xrpl.Client; +using Xrpl.Client.Exceptions; + +namespace Xrpl.Tests.ClientLib +{ + [TestClass] + public class TestURequestManagerConcurrency + { + private class FakeRequest + { + public Guid? Id { get; set; } + } + + private static void RunConcurrent(Action body, out IReadOnlyList errors) + { + int threadCount = Math.Max(8, Environment.ProcessorCount * 2); + const int iterationsPerThread = 500; + + ConcurrentBag collected = new ConcurrentBag(); + Barrier barrier = new Barrier(threadCount); + Thread[] threads = new Thread[threadCount]; + + for (int t = 0; t < threadCount; t++) + { + threads[t] = new Thread(() => + { + barrier.SignalAndWait(); + for (int i = 0; i < iterationsPerThread; i++) + { + try + { + body(); + } + catch (Exception ex) + { + collected.Add(ex); + } + } + }); + } + + foreach (Thread thread in threads) + thread.Start(); + foreach (Thread thread in threads) + thread.Join(); + + errors = collected.ToArray(); + } + + [TestMethod] + public void CreateRequest_ConcurrentCalls_AssignUniqueIdsWithoutCollision() + { + RequestManager manager = new RequestManager(); + ConcurrentBag ids = new ConcurrentBag(); + + RunConcurrent( + () => + { + RequestManager.XrplRequest request = manager.CreateRequest( + new Dictionary(), + System.Threading.Timeout.InfiniteTimeSpan); + ids.Add(request.Id); + }, + out IReadOnlyList errors); + + Assert.AreEqual(0, errors.Count, + $"Concurrent CreateRequest threw {errors.Count} exception(s): " + + string.Join(" | ", errors.Take(3).Select(e => e.Message))); + + List all = ids.ToList(); + Assert.AreEqual(all.Count, all.Distinct().Count(), + "Concurrent CreateRequest assigned duplicate ids"); + } + + [TestMethod] + public void CreateGRequest_ConcurrentCalls_AssignUniqueIdsWithoutCollision() + { + RequestManager manager = new RequestManager(); + ConcurrentBag ids = new ConcurrentBag(); + + RunConcurrent( + () => + { + RequestManager.XrplGRequest request = manager.CreateGRequest( + new FakeRequest(), + System.Threading.Timeout.InfiniteTimeSpan); + ids.Add(request.Id); + }, + out IReadOnlyList errors); + + Assert.AreEqual(0, errors.Count, + $"Concurrent CreateGRequest threw {errors.Count} exception(s): " + + string.Join(" | ", errors.Take(3).Select(e => e.Message))); + + List all = ids.ToList(); + Assert.AreEqual(all.Count, all.Distinct().Count(), + "Concurrent CreateGRequest assigned duplicate ids"); + } + } +} diff --git a/Xrpl/Client/Exceptions/XrplException.cs b/Xrpl/Client/Exceptions/XrplException.cs index 5dae4e81..c315fc7d 100644 --- a/Xrpl/Client/Exceptions/XrplException.cs +++ b/Xrpl/Client/Exceptions/XrplException.cs @@ -96,7 +96,15 @@ public DisconnectedException(string message) : base(message) /// public class RippledNotInitializedException : XrplException { } /// - /// Exception thrown when xrpl.js times out. + /// Exception thrown when a request to rippled times out. + /// + /// IMPORTANT: this is Xrpl.Client.Exceptions.TimeoutException, NOT + /// . It derives from , so a + /// catch (System.TimeoutException) will NOT catch it. Catch this type (or its base + /// ) explicitly; if both using System; and + /// using Xrpl.Client.Exceptions; are in scope, fully-qualify the type to avoid catching + /// the wrong one. + /// /// public class TimeoutException : XrplException { diff --git a/Xrpl/Client/RequestManager.cs b/Xrpl/Client/RequestManager.cs index 10d38a53..7f19c8aa 100644 --- a/Xrpl/Client/RequestManager.cs +++ b/Xrpl/Client/RequestManager.cs @@ -42,7 +42,6 @@ public class XrplGRequest public Task Promise { get; set; } } - private Guid nextId = Guid.NewGuid(); private readonly ConcurrentDictionary timeoutsAwaitingResponse = new ConcurrentDictionary(); private readonly ConcurrentDictionary promisesAwaitingResponse = new ConcurrentDictionary(); private readonly JsonSerializerOptions serializerOptions = XrplJsonOptions.Default; @@ -166,27 +165,14 @@ public XrplGRequest CreateGRequest(R request, TimeSpan timeout, Cancellati $"Timeout must be positive or Timeout.InfiniteTimeSpan, but was {timeout.TotalSeconds:F1}s"); } - Guid newId; var info = request.GetType().GetProperty("Id"); - if (info.GetValue(request) == null) - { - newId = this.nextId; - this.nextId = Guid.NewGuid(); - } - else - { - newId = (Guid)info.GetValue(request); - } + object existingId = info.GetValue(request); + Guid newId = existingId == null ? Guid.NewGuid() : (Guid)existingId; info.SetValue(request, newId, null); string newRequest = JsonSerializer.Serialize(request, serializerOptions); - if (this.promisesAwaitingResponse.ContainsKey(newId)) - { - throw new XrplException($"Response with id '${newId}' is already pending"); - } - TaskCompletionSource task = new TaskCompletionSource(); TaskInfo taskInfo = new TaskInfo(); taskInfo.TaskId = newId; @@ -194,7 +180,10 @@ public XrplGRequest CreateGRequest(R request, TimeSpan timeout, Cancellati taskInfo.RemoveUponCompletion = true; taskInfo.Type = typeof(T); - promisesAwaitingResponse.TryAdd(newId, taskInfo); + if (!promisesAwaitingResponse.TryAdd(newId, taskInfo)) + { + throw new XrplException($"Response with id '${newId}' is already pending"); + } if (cancellationToken.CanBeCanceled) { @@ -249,27 +238,13 @@ public XrplRequest CreateRequest(Dictionary request, TimeSpan ti $"Timeout must be positive or Timeout.InfiniteTimeSpan, but was {timeout.TotalSeconds:F1}s"); } - Guid newId; var hasId = request.TryGetValue("id", out var id); - if (!hasId) - { - newId = this.nextId; - this.nextId = Guid.NewGuid(); - } - else - { - newId = (Guid)id; - } + Guid newId = hasId ? (Guid)id : Guid.NewGuid(); request["id"] = newId; string newRequest = JsonSerializer.Serialize(request, serializerOptions); - if (this.promisesAwaitingResponse.ContainsKey(newId)) - { - throw new XrplException($"Response with id '${newId}' is already pending"); - } - TaskCompletionSource> task = new TaskCompletionSource>(); TaskInfo taskInfo = new TaskInfo(); taskInfo.TaskId = newId; @@ -277,7 +252,10 @@ public XrplRequest CreateRequest(Dictionary request, TimeSpan ti taskInfo.RemoveUponCompletion = true; taskInfo.Type = typeof(Dictionary); - promisesAwaitingResponse.TryAdd(newId, taskInfo); + if (!promisesAwaitingResponse.TryAdd(newId, taskInfo)) + { + throw new XrplException($"Response with id '${newId}' is already pending"); + } if (cancellationToken.CanBeCanceled) { diff --git a/Xrpl/Client/connection.cs b/Xrpl/Client/connection.cs index 10a54ad5..4a13e754 100644 --- a/Xrpl/Client/connection.cs +++ b/Xrpl/Client/connection.cs @@ -2419,14 +2419,14 @@ private void StartMessageProcessor() { if (cts.Token.IsCancellationRequested) return; - + try { await ProcessStreamMessageAsync(message).ConfigureAwait(false); } catch (Exception ex) { - Debug.WriteLine($"{DateTime.Now}Stream message processing error: {ex.Message}"); + await NotifyStreamProcessingErrorAsync(ex, message).ConfigureAwait(false); } } } @@ -2755,7 +2755,39 @@ private async Task ProcessStreamMessageFireAndForgetAsync(string message) } catch (Exception ex) { - Debug.WriteLine($"{DateTime.Now}Stream message processing error: {ex.Message}"); + await NotifyStreamProcessingErrorAsync(ex, message).ConfigureAwait(false); + } + } + + /// + /// Surfaces an exception raised while processing a stream message — including exceptions + /// thrown by consumer stream handlers (e.g. , ) — + /// through the event instead of swallowing it into a debug trace, so consumer + /// bugs are observable. The message loop is always kept alive: cancellation is ignored, and an + /// exception thrown by the handler itself is contained. + /// + private async Task NotifyStreamProcessingErrorAsync(Exception ex, string message) + { + Debug.WriteLine($"{DateTime.Now}Stream message processing error: {ex.Message}"); + + if (ex is OperationCanceledException) + { + return; + } + + var handler = OnError; + if (handler is null) + { + return; + } + + try + { + await handler.Invoke(error: "error", errorMessage: "streamHandlerError", message: ex.Message, data: message).ConfigureAwait(false); + } + catch (Exception notifyEx) + { + Debug.WriteLine($"{DateTime.Now}OnError handler threw while reporting stream processing error: {notifyEx.Message}"); } } } \ No newline at end of file diff --git a/Xrpl/Xrpl.csproj b/Xrpl/Xrpl.csproj index c2d6d447..7e0d8cce 100644 --- a/Xrpl/Xrpl.csproj +++ b/Xrpl/Xrpl.csproj @@ -14,7 +14,7 @@ Apache-2.0 https://github.com/StaticBit-io/XrplCSharp XrplCSharp - 10.4.1.0 + 10.4.2.0