-
Notifications
You must be signed in to change notification settings - Fork 1
fix: thread-safe RequestManager and surface stream-handler errors (v10.4.2) #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -257,6 +257,28 @@ public async Task TestEmitsBookChanges() | |
| Assert.AreEqual("LESS", change.AssetB.ToString()); | ||
| } | ||
|
|
||
| [TestMethod] | ||
| public async Task StreamHandlerException_IsSurfacedViaOnError() | ||
| { | ||
|
Comment on lines
+260
to
+262
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tag this unit test with The new unit test should be discoverable in As per coding guidelines: “Tag unit tests with TestU filter category and integration tests with TestI filter category for test organization.” 🤖 Prompt for AI AgentsSource: Coding guidelines |
||
| TaskCompletionSource<string> errorReported = new TaskCompletionSource<string>(); | ||
|
|
||
| runner.client.connection.OnLedgerClosed += _ => | ||
| throw new InvalidOperationException("consumer handler bug"); | ||
|
|
||
| runner.client.connection.OnError += (error, errorMessage, message, data) => | ||
| { | ||
| errorReported.TrySetResult(errorMessage); | ||
| return Task.CompletedTask; | ||
| }; | ||
|
|
||
| string jsonString = "{\"fee_base\":10,\"fee_ref\":10,\"ledger_hash\":\"B3980C722D71873D6708723E71B7A28C826BC66C58712ADCEC61603415305CD1\",\"ledger_index\":66093872,\"ledger_time\":683942720,\"reserve_base\":20000000,\"reserve_inc\":5000000,\"txn_count\":70,\"type\":\"ledgerClosed\",\"validated_ledgers\":\"65201743-66093872\"}"; | ||
| await runner.client.connection.OnMessage(jsonString); | ||
|
|
||
| Task completed = await Task.WhenAny(errorReported.Task, Task.Delay(5000)); | ||
| Assert.AreEqual(errorReported.Task, completed, | ||
| "Exception thrown by a stream handler was swallowed instead of surfaced via OnError"); | ||
| } | ||
|
|
||
| [TestMethod] | ||
| public async Task TestEmitsServerStatus() | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
||
| using Microsoft.VisualStudio.TestTools.UnitTesting; | ||
|
|
||
| using Xrpl.Client; | ||
| using Xrpl.Client.Exceptions; | ||
|
|
||
| namespace Xrpl.Tests.ClientLib | ||
| { | ||
| [TestClass] | ||
| public class TestURequestManagerConcurrency | ||
| { | ||
|
Comment on lines
+15
to
+17
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add Please tag this test class/methods with Proposed fix [TestClass]
+[TestCategory("TestU")]
public class TestURequestManagerConcurrencyAs per coding guidelines: “Tag unit tests with TestU filter category and integration tests with TestI filter category for test organization.” Also applies to: 59-60, 84-85 🤖 Prompt for AI AgentsSource: Coding guidelines |
||
| private class FakeRequest | ||
| { | ||
| public Guid? Id { get; set; } | ||
| } | ||
|
|
||
| private static void RunConcurrent(Action body, out IReadOnlyList<Exception> errors) | ||
| { | ||
| int threadCount = Math.Max(8, Environment.ProcessorCount * 2); | ||
| const int iterationsPerThread = 500; | ||
|
|
||
| ConcurrentBag<Exception> collected = new ConcurrentBag<Exception>(); | ||
| Barrier barrier = new Barrier(threadCount); | ||
| Thread[] threads = new Thread[threadCount]; | ||
|
|
||
| for (int t = 0; t < threadCount; t++) | ||
| { | ||
| threads[t] = new Thread(() => | ||
| { | ||
| barrier.SignalAndWait(); | ||
| for (int i = 0; i < iterationsPerThread; i++) | ||
| { | ||
| try | ||
| { | ||
| body(); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| collected.Add(ex); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| foreach (Thread thread in threads) | ||
| thread.Start(); | ||
| foreach (Thread thread in threads) | ||
| thread.Join(); | ||
|
|
||
| errors = collected.ToArray(); | ||
| } | ||
|
|
||
| [TestMethod] | ||
| public void CreateRequest_ConcurrentCalls_AssignUniqueIdsWithoutCollision() | ||
| { | ||
| RequestManager manager = new RequestManager(); | ||
| ConcurrentBag<Guid> ids = new ConcurrentBag<Guid>(); | ||
|
|
||
| RunConcurrent( | ||
| () => | ||
| { | ||
| RequestManager.XrplRequest request = manager.CreateRequest( | ||
| new Dictionary<string, object>(), | ||
| System.Threading.Timeout.InfiniteTimeSpan); | ||
| ids.Add(request.Id); | ||
| }, | ||
| out IReadOnlyList<Exception> errors); | ||
|
|
||
| Assert.AreEqual(0, errors.Count, | ||
| $"Concurrent CreateRequest threw {errors.Count} exception(s): " + | ||
| string.Join(" | ", errors.Take(3).Select(e => e.Message))); | ||
|
|
||
| List<Guid> all = ids.ToList(); | ||
| Assert.AreEqual(all.Count, all.Distinct().Count(), | ||
| "Concurrent CreateRequest assigned duplicate ids"); | ||
| } | ||
|
|
||
| [TestMethod] | ||
| public void CreateGRequest_ConcurrentCalls_AssignUniqueIdsWithoutCollision() | ||
| { | ||
| RequestManager manager = new RequestManager(); | ||
| ConcurrentBag<Guid> ids = new ConcurrentBag<Guid>(); | ||
|
|
||
| RunConcurrent( | ||
| () => | ||
| { | ||
| RequestManager.XrplGRequest request = manager.CreateGRequest<object, FakeRequest>( | ||
| new FakeRequest(), | ||
| System.Threading.Timeout.InfiniteTimeSpan); | ||
| ids.Add(request.Id); | ||
| }, | ||
| out IReadOnlyList<Exception> errors); | ||
|
|
||
| Assert.AreEqual(0, errors.Count, | ||
| $"Concurrent CreateGRequest threw {errors.Count} exception(s): " + | ||
| string.Join(" | ", errors.Take(3).Select(e => e.Message))); | ||
|
|
||
| List<Guid> all = ids.ToList(); | ||
| Assert.AreEqual(all.Count, all.Distinct().Count(), | ||
| "Concurrent CreateGRequest assigned duplicate ids"); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,7 +42,6 @@ public class XrplGRequest | |
| public Task<object> Promise { get; set; } | ||
| } | ||
|
|
||
| private Guid nextId = Guid.NewGuid(); | ||
| private readonly ConcurrentDictionary<Guid, Timer> timeoutsAwaitingResponse = new ConcurrentDictionary<Guid, Timer>(); | ||
| private readonly ConcurrentDictionary<Guid, TaskInfo> promisesAwaitingResponse = new ConcurrentDictionary<Guid, TaskInfo>(); | ||
| private readonly JsonSerializerOptions serializerOptions = XrplJsonOptions.Default; | ||
|
|
@@ -166,35 +165,25 @@ public XrplGRequest CreateGRequest<T, R>(R request, TimeSpan timeout, Cancellati | |
| $"Timeout must be positive or Timeout.InfiniteTimeSpan, but was {timeout.TotalSeconds:F1}s"); | ||
| } | ||
|
|
||
| Guid newId; | ||
| var info = request.GetType().GetProperty("Id"); | ||
| if (info.GetValue(request) == null) | ||
| { | ||
| newId = this.nextId; | ||
| this.nextId = Guid.NewGuid(); | ||
| } | ||
| else | ||
| { | ||
| newId = (Guid)info.GetValue(request); | ||
| } | ||
| object existingId = info.GetValue(request); | ||
| Guid newId = existingId == null ? Guid.NewGuid() : (Guid)existingId; | ||
|
Comment on lines
+169
to
+170
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Normalize caller-provided ids before casting to Line 170 and Line 242 currently assume any provided id is already a non-empty Proposed fix- object existingId = info.GetValue(request);
- Guid newId = existingId == null ? Guid.NewGuid() : (Guid)existingId;
+ object? existingId = info.GetValue(request);
+ Guid newId = existingId switch
+ {
+ Guid g when g != Guid.Empty => g,
+ string s when Guid.TryParse(s, out var parsed) && parsed != Guid.Empty => parsed,
+ _ => Guid.NewGuid()
+ };
- Guid newId = hasId ? (Guid)id : Guid.NewGuid();
+ Guid newId = hasId
+ ? id switch
+ {
+ Guid g when g != Guid.Empty => g,
+ string s when Guid.TryParse(s, out var parsed) && parsed != Guid.Empty => parsed,
+ _ => Guid.NewGuid()
+ }
+ : Guid.NewGuid();Also applies to: 242-243 🤖 Prompt for AI Agents |
||
|
|
||
| info.SetValue(request, newId, null); | ||
|
|
||
| string newRequest = JsonSerializer.Serialize(request, serializerOptions); | ||
|
|
||
| if (this.promisesAwaitingResponse.ContainsKey(newId)) | ||
| { | ||
| throw new XrplException($"Response with id '${newId}' is already pending"); | ||
| } | ||
|
|
||
| TaskCompletionSource<object> task = new TaskCompletionSource<object>(); | ||
| TaskInfo taskInfo = new TaskInfo(); | ||
| taskInfo.TaskId = newId; | ||
| taskInfo.TaskCompletionResult = task; | ||
| taskInfo.RemoveUponCompletion = true; | ||
| taskInfo.Type = typeof(T); | ||
|
|
||
| promisesAwaitingResponse.TryAdd(newId, taskInfo); | ||
| if (!promisesAwaitingResponse.TryAdd(newId, taskInfo)) | ||
| { | ||
| throw new XrplException($"Response with id '${newId}' is already pending"); | ||
| } | ||
|
|
||
| if (cancellationToken.CanBeCanceled) | ||
| { | ||
|
|
@@ -249,35 +238,24 @@ public XrplRequest CreateRequest(Dictionary<string, object> request, TimeSpan ti | |
| $"Timeout must be positive or Timeout.InfiniteTimeSpan, but was {timeout.TotalSeconds:F1}s"); | ||
| } | ||
|
|
||
| Guid newId; | ||
| var hasId = request.TryGetValue("id", out var id); | ||
| if (!hasId) | ||
| { | ||
| newId = this.nextId; | ||
| this.nextId = Guid.NewGuid(); | ||
| } | ||
| else | ||
| { | ||
| newId = (Guid)id; | ||
| } | ||
| Guid newId = hasId ? (Guid)id : Guid.NewGuid(); | ||
|
|
||
| request["id"] = newId; | ||
|
|
||
| string newRequest = JsonSerializer.Serialize(request, serializerOptions); | ||
|
|
||
| if (this.promisesAwaitingResponse.ContainsKey(newId)) | ||
| { | ||
| throw new XrplException($"Response with id '${newId}' is already pending"); | ||
| } | ||
|
|
||
| TaskCompletionSource<Dictionary<string, object>> task = new TaskCompletionSource<Dictionary<string, object>>(); | ||
| TaskInfo taskInfo = new TaskInfo(); | ||
| taskInfo.TaskId = newId; | ||
| taskInfo.TaskCompletionResult = task; | ||
| taskInfo.RemoveUponCompletion = true; | ||
| taskInfo.Type = typeof(Dictionary<string, object>); | ||
|
|
||
| promisesAwaitingResponse.TryAdd(newId, taskInfo); | ||
| if (!promisesAwaitingResponse.TryAdd(newId, taskInfo)) | ||
| { | ||
| throw new XrplException($"Response with id '${newId}' is already pending"); | ||
| } | ||
|
|
||
| if (cancellationToken.CanBeCanceled) | ||
| { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix markdown heading level increment at Line 3.
### 10.4.2.0 ...jumps from# Changestoh3and triggers MD001; use##here (or add an intermediate##section).🧰 Tools
🪛 markdownlint-cli2 (0.22.1)
[warning] 3-3: Heading levels should only increment by one level at a time
Expected: h2; Actual: h3
(MD001, heading-increment)
🤖 Prompt for AI Agents
Source: Linters/SAST tools