From 745334dfe360cb586a7a76cb15c7015fc3cd695d Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Tue, 16 Jun 2026 14:41:45 -0500 Subject: [PATCH 1/8] Add dynamic transaction thread pool that grows on demand --- Common/Util/DynamicWorkerPool.cs | 319 ++++++++++++++++++ .../BrokerageTransactionHandler.cs | 102 +++--- Launcher/config.json | 4 +- Tests/Common/Util/DynamicWorkerPoolTests.cs | 280 +++++++++++++++ .../BrokerageTransactionHandlerTests.cs | 183 +++++++++- 5 files changed, 847 insertions(+), 41 deletions(-) create mode 100644 Common/Util/DynamicWorkerPool.cs create mode 100644 Tests/Common/Util/DynamicWorkerPoolTests.cs diff --git a/Common/Util/DynamicWorkerPool.cs b/Common/Util/DynamicWorkerPool.cs new file mode 100644 index 000000000000..33cc199ecca3 --- /dev/null +++ b/Common/Util/DynamicWorkerPool.cs @@ -0,0 +1,319 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; + +namespace QuantConnect.Util +{ + /// + /// A worker pool that routes items into a fixed number of partitions by key, keeping the routing + /// stable while the number of workers grows on demand from a minimum up to a maximum when busy. + /// Each partition is processed by a single worker at a time, so items sharing a key keep their order. + /// + /// The item type being processed + public class DynamicWorkerPool : IDisposable + { + private readonly Action _handler; + private readonly Action _onError; + private readonly string _threadName; + private readonly int _minWorkers; + private readonly int _maxWorkers; + + private readonly ConcurrentQueue[] _partitions; + // 0 = free, 1 = claimed; ensures at most one worker processes a partition at a time + private readonly int[] _claims; + private readonly ManualResetEventSlim _workAvailable; + private readonly CancellationTokenSource _cancellationTokenSource; + private readonly List _workers; + private readonly object _workersLock = new object(); + + private int _activeWorkerCount; + private int _busyWorkers; + private bool _started; + + /// + /// The number of worker threads currently running + /// + public int WorkerCount => Volatile.Read(ref _activeWorkerCount); + + /// + /// The fixed number of partitions used to route items (equal to the maximum worker count) + /// + public int PartitionCount => _partitions.Length; + + /// + /// True while any partition has pending work or any worker is still processing an item + /// + public bool IsBusy => IsPoolBusy(); + + /// + /// Initializes a new instance of the class + /// + /// The action invoked to process each item + /// The number of worker threads to start with (at least 1) + /// The maximum number of worker threads the pool can grow to + /// Optional callback invoked when the handler throws an unexpected exception + /// Optional name prefix used for the worker threads + public DynamicWorkerPool(Action handler, int minWorkers, int maxWorkers, Action onError = null, string threadName = "DynamicWorkerPool") + { + _handler = handler ?? throw new ArgumentNullException(nameof(handler)); + _maxWorkers = Math.Max(1, maxWorkers); + _minWorkers = Math.Min(Math.Max(1, minWorkers), _maxWorkers); + _onError = onError; + _threadName = threadName; + + _partitions = new ConcurrentQueue[_maxWorkers]; + for (var i = 0; i < _maxWorkers; i++) + { + _partitions[i] = new ConcurrentQueue(); + } + _claims = new int[_maxWorkers]; + _workAvailable = new ManualResetEventSlim(false); + _cancellationTokenSource = new CancellationTokenSource(); + _workers = new List(_maxWorkers); + } + + /// + /// Starts the pool with the minimum number of worker threads. Idempotent. + /// + public void Start() + { + lock (_workersLock) + { + if (_started) + { + return; + } + _started = true; + + for (var i = 0; i < _minWorkers; i++) + { + _workers.Add(NewWorker(i)); + } + _activeWorkerCount = _minWorkers; + foreach (var worker in _workers) + { + worker.Start(); + } + } + } + + /// + /// Enqueues an item to be processed. Items are routed to a partition by , + /// so all items sharing the same key land on the same partition and keep their relative order. + /// + /// The routing key (e.g. an order id); the same key always maps to the same partition + /// The item to process + public void Enqueue(long key, T item) + { + var partition = (int)(key % _partitions.Length); + if (partition < 0) + { + partition += _partitions.Length; + } + _partitions[partition].Enqueue(item); + + // signal the workers and grow the pool if the partitions are starving + _workAvailable.Set(); + MaybeScaleUp(); + } + + /// + /// Waits until all partitions are empty and no worker is processing, or the timeout elapses + /// + /// True if the pool became idle, false on timeout + public bool WaitForIdle(TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (IsPoolBusy()) + { + if (DateTime.UtcNow >= deadline) + { + return false; + } + Thread.Sleep(1); + } + return true; + } + + /// + /// Stops the pool, signaling the workers to exit and waiting for them to finish + /// + public void Dispose() + { + if (!_cancellationTokenSource.IsCancellationRequested) + { + _cancellationTokenSource.Cancel(); + } + _workAvailable.Set(); + + lock (_workersLock) + { + foreach (var worker in _workers) + { + worker?.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource); + } + } + + _workAvailable.DisposeSafely(); + _cancellationTokenSource.DisposeSafely(); + } + + private Thread NewWorker(int id) + { + return new Thread(WorkerLoop) { IsBackground = true, Name = $"{_threadName} {id}" }; + } + + /// + /// Worker entry point. Scans the partitions, claiming and processing any that have pending work, + /// and blocks when there is none. + /// + private void WorkerLoop() + { + var token = _cancellationTokenSource.Token; + try + { + while (!token.IsCancellationRequested) + { + if (!ProcessAvailable()) + { + // no work found: reset and re-scan before blocking to avoid lost wake-ups + _workAvailable.Reset(); + if (!ProcessAvailable()) + { + _workAvailable.Wait(token); + } + } + } + } + catch (OperationCanceledException) + { + // shutting down + } + catch (Exception err) + { + _onError?.Invoke(err); + } + finally + { + Interlocked.Decrement(ref _activeWorkerCount); + } + } + + /// + /// Scans all partitions and processes the ones with pending work. A partition is claimed before + /// processing so at most one worker handles it at a time, preserving per-key ordering. + /// + /// True if any work was processed + private bool ProcessAvailable() + { + var worked = false; + for (var i = 0; i < _partitions.Length; i++) + { + var partition = _partitions[i]; + if (partition.IsEmpty) + { + continue; + } + + // claim the partition; if another worker owns it, skip and let that worker process it + if (Interlocked.CompareExchange(ref _claims[i], 1, 0) != 0) + { + continue; + } + + Interlocked.Increment(ref _busyWorkers); + try + { + while (partition.TryDequeue(out var item)) + { + _handler(item); + worked = true; + } + } + finally + { + Interlocked.Decrement(ref _busyWorkers); + Volatile.Write(ref _claims[i], 0); + } + + // items may have been added between our last dequeue and releasing the claim; + // make sure a worker wakes up to handle them + if (!partition.IsEmpty) + { + _workAvailable.Set(); + } + } + return worked; + } + + /// + /// Grows the pool by one worker (up to the maximum) when the partitions are starving, i.e. every + /// running worker is already busy at the moment new work is enqueued. + /// + private void MaybeScaleUp() + { + var active = Volatile.Read(ref _activeWorkerCount); + if (active >= _maxWorkers) + { + return; + } + + if (Volatile.Read(ref _busyWorkers) >= active) + { + TrySpawnWorker(); + } + } + + private void TrySpawnWorker() + { + lock (_workersLock) + { + if (!_started || _activeWorkerCount >= _maxWorkers || _cancellationTokenSource.IsCancellationRequested) + { + return; + } + + var worker = NewWorker(_workers.Count); + _workers.Add(worker); + _activeWorkerCount++; + worker.Start(); + } + + // wake the new worker (and any idle ones) to pick up the backlog + _workAvailable.Set(); + } + + private bool IsPoolBusy() + { + if (Volatile.Read(ref _busyWorkers) > 0) + { + return true; + } + for (var i = 0; i < _partitions.Length; i++) + { + if (!_partitions[i].IsEmpty) + { + return true; + } + } + return false; + } + } +} diff --git a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs index 53bf641716c7..5722bd1e5571 100644 --- a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs +++ b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs @@ -75,7 +75,9 @@ public class BrokerageTransactionHandler : ITransactionHandler /// protected List> _orderRequestQueues { get; set; } - private List _processingThreads; + // Worker pool for concurrent order processing, routed by OrderId, growing on demand. Null in the synchronous backtest path. + private DynamicWorkerPool _pool; + private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); private readonly ConcurrentQueue _orderEvents = new ConcurrentQueue(); @@ -236,24 +238,45 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu /// protected virtual void InitializeTransactionThread() { - // multi threaded queue, used for live deployments - var processingThreadsCount = _brokerage.ConcurrencyEnabled - ? Config.GetInt("maximum-transaction-threads", 4) - : 1; - _orderRequestQueues = new(processingThreadsCount); - _processingThreads = new(processingThreadsCount); - for (var i = 0; i < processingThreadsCount; i++) - { - _orderRequestQueues.Add(new BusyBlockingCollection()); - var threadId = i; // avoid modified closure - _processingThreads.Add(new Thread(() => Run(threadId)) { IsBackground = true, Name = $"Transaction Thread {i}" }); - } - foreach (var thread in _processingThreads) - { - thread.Start(); - } + // The pool starts with the minimum number of workers and grows up to the maximum on demand. + // Requests are routed by OrderId, and each order is processed by a single worker at a time, + // which preserves the Submit/Update/Cancel ordering per OrderId even as the pool scales. + var maxThreads = _brokerage.ConcurrencyEnabled ? Math.Max(1, MaximumTransactionThreads) : 1; + var minThreads = _brokerage.ConcurrencyEnabled ? Math.Min(Math.Max(1, MinimumTransactionThreads), maxThreads) : 1; + + _pool = new DynamicWorkerPool( + request => + { + HandleOrderRequest(request); + ProcessAsynchronousEvents(); + }, + minThreads, + maxThreads, + onError: err => + { + // unexpected error, we need to close down shop + _algorithm.SetRuntimeError(err, "HandleOrderRequest"); + IsActive = false; + }, + threadName: "Transaction Thread"); + _pool.Start(); } + /// + /// The maximum number of worker threads the dynamic transaction thread pool can grow to + /// + protected virtual int MaximumTransactionThreads => Config.GetInt("maximum-transaction-threads", 10); + + /// + /// The number of worker threads the dynamic transaction thread pool starts with + /// + protected virtual int MinimumTransactionThreads => Config.GetInt("minimum-transaction-threads", 2); + + /// + /// The number of worker threads currently running in the dynamic transaction thread pool + /// + protected int ProcessingThreadsCount => _pool?.WorkerCount ?? 0; + /// /// Boolean flag indicating the Run thread method is busy. /// False indicates it is completely finished processing and ready to be terminated. @@ -674,7 +697,8 @@ public List GetOpenOrders(Func filter = null) } /// - /// Primary thread entry point to launch the transaction thread. + /// Processes the order request queue synchronously. Used by the backtesting transaction handler, + /// which processes order requests on the algorithm thread instead of using the worker pool. /// protected void Run(int threadId) { @@ -691,12 +715,6 @@ protected void Run(int threadId) // unexpected error, we need to close down shop _algorithm.SetRuntimeError(err, "HandleOrderRequest"); } - - if (_processingThreads != null) - { - Log.Trace($"BrokerageTransactionHandler.Run(): Ending Thread {threadId}..."); - IsActive = false; - } } /// @@ -717,7 +735,11 @@ public virtual void ProcessSynchronousEvents() // in backtesting we need to wait for orders to be removed from the queue and finished processing if (!_algorithm.LiveMode) { - if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token))) + if (_orderRequestQueues != null && _orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token))) + { + Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing."); + } + else if (_pool != null && !_pool.WaitForIdle(Time.OneSecond)) { Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing."); } @@ -800,23 +822,14 @@ public void AddOpenOrder(Order order, IAlgorithm algorithm) public void Exit() { var timeout = TimeSpan.FromSeconds(60); - if (_processingThreads != null) + if (_pool != null) { - // only wait if the processing thread is running - if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(timeout))) + // wait for the pool to finish processing pending requests, then stop the workers + if (!_pool.WaitForIdle(timeout)) { Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds."); } - - foreach (var queue in _orderRequestQueues) - { - queue.CompleteAdding(); - } - - foreach (var thread in _processingThreads) - { - thread?.StopSafely(timeout, _cancellationTokenSource); - } + _pool.DisposeSafely(); } IsActive = false; _cancellationTokenSource.DisposeSafely(); @@ -1939,12 +1952,23 @@ private string GetShortableErrorMessage(Symbol symbol, decimal quantity) private void EnqueueOrderRequest(OrderRequest request, Order order) { + // route by OrderId (or combo group id) so all requests for the same order are processed + // in order by a single worker; the pool keeps the routing stable as it scales var queueKey = request.OrderId; if (order.GroupOrderManager?.Id > 0) { queueKey = order.GroupOrderManager.Id; } - _orderRequestQueues[queueKey % _orderRequestQueues.Count].Add(request); + + if (_pool != null) + { + _pool.Enqueue(queueKey, request); + } + else + { + // synchronous backtest path: a single queue processed on the algorithm thread + _orderRequestQueues[(int)(queueKey % _orderRequestQueues.Count)].Add(request); + } } /// diff --git a/Launcher/config.json b/Launcher/config.json index f0498a0996ab..03af8dd9c5f9 100644 --- a/Launcher/config.json +++ b/Launcher/config.json @@ -58,7 +58,9 @@ "ignore-unknown-asset-holdings": true, // The maximum amount of transaction threads for concurrent order submissions if the brokerage supports it. - //"maximum-transaction-threads": 4, + // The pool starts at the minimum and grows up to the maximum on demand. + //"minimum-transaction-threads": 2, + //"maximum-transaction-threads": 10, // log missing data files, useful for debugging "show-missing-data-logs": false, diff --git a/Tests/Common/Util/DynamicWorkerPoolTests.cs b/Tests/Common/Util/DynamicWorkerPoolTests.cs new file mode 100644 index 000000000000..22eb6c5db31f --- /dev/null +++ b/Tests/Common/Util/DynamicWorkerPoolTests.cs @@ -0,0 +1,280 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using NUnit.Framework; +using QuantConnect.Util; + +namespace QuantConnect.Tests.Common.Util +{ + [TestFixture] + public class DynamicWorkerPoolTests + { + [Test] + public void StartsWithMinimumWorkers() + { + using var pool = new DynamicWorkerPool(_ => { }, minWorkers: 2, maxWorkers: 10); + pool.Start(); + + Assert.AreEqual(2, pool.WorkerCount); + Assert.AreEqual(10, pool.PartitionCount); + } + + [Test] + public void ClampsMinAndMaxWorkers() + { + // min is clamped to at least 1, and to at most max + using var pool = new DynamicWorkerPool(_ => { }, minWorkers: 0, maxWorkers: 1); + pool.Start(); + + Assert.AreEqual(1, pool.WorkerCount); + Assert.AreEqual(1, pool.PartitionCount); + } + + [Test] + public void ThrowsOnNullHandler() + { + Assert.Throws(() => new DynamicWorkerPool(null, 1, 2)); + } + + [Test] + public void ProcessesAllEnqueuedItems() + { + const int count = 200; + var processed = new ConcurrentBag(); + using var done = new CountdownEvent(count); + using var pool = new DynamicWorkerPool(i => + { + processed.Add(i); + done.Signal(); + }, minWorkers: 2, maxWorkers: 10); + pool.Start(); + + for (var i = 0; i < count; i++) + { + pool.Enqueue(i, i); + } + + Assert.IsTrue(done.Wait(10000)); + CollectionAssert.AreEquivalent(Enumerable.Range(0, count), processed); + } + + [Test] + public void GrowsUnderBacklogUpToMaximum([Values(10, 3)] int maxWorkers) + { + using var gate = new ManualResetEventSlim(false); + using var pool = new DynamicWorkerPool(_ => gate.Wait(), minWorkers: 2, maxWorkers: maxWorkers); + pool.Start(); + Assert.AreEqual(2, pool.WorkerCount); + + try + { + // keep feeding work while the workers stay busy on the gate, so the starving pool grows + var key = 0; + var reachedMax = SpinWait.SpinUntil(() => + { + if (key < 1000) + { + pool.Enqueue(key, key); + key++; + } + return pool.WorkerCount >= maxWorkers; + }, 10000); + + Assert.IsTrue(reachedMax, $"Pool did not grow to the maximum, current size: {pool.WorkerCount}"); + // never grows beyond the configured maximum + Assert.AreEqual(maxWorkers, pool.WorkerCount); + } + finally + { + gate.Set(); + } + } + + [Test] + public void DoesNotGrowWhenWorkersKeepUp() + { + // workers process instantly, so there is never a starving backlog and the pool stays minimal + using var pool = new DynamicWorkerPool(_ => { }, minWorkers: 2, maxWorkers: 10); + pool.Start(); + + for (var i = 0; i < 50; i++) + { + pool.Enqueue(i, i); + Thread.Sleep(1); + } + + Assert.IsTrue(pool.WaitForIdle(TimeSpan.FromSeconds(5))); + Assert.AreEqual(2, pool.WorkerCount); + } + + [Test] + public void PreservesOrderPerKey() + { + const int maxWorkers = 10; + const int keysCount = maxWorkers; // one logical key per partition + const int itemsPerKey = 50; + using var gate = new ManualResetEventSlim(false); + var sequence = new ConcurrentQueue<(int Key, int Value)>(); + + using var pool = new DynamicWorkerPool<(int Key, int Value)>(item => + { + gate.Wait(); + sequence.Enqueue(item); + }, minWorkers: 2, maxWorkers: maxWorkers); + pool.Start(); + + // interleave items across keys; items with the same key must keep their relative order + for (var n = 0; n < itemsPerKey; n++) + { + for (var key = 0; key < keysCount; key++) + { + pool.Enqueue(key, (key, n)); + } + } + + gate.Set(); + Assert.IsTrue(pool.WaitForIdle(TimeSpan.FromSeconds(10))); + + foreach (var group in sequence.ToList().GroupBy(x => x.Key)) + { + var values = group.Select(x => x.Value).ToList(); + CollectionAssert.AreEqual(Enumerable.Range(0, itemsPerKey).ToList(), values, + $"key {group.Key} was processed out of order"); + } + } + + [Test] + public void NeverProcessesSamePartitionConcurrently() + { + const int maxWorkers = 10; + var active = new ConcurrentDictionary(); + var overlapDetected = 0; + const int count = 2000; + using var done = new CountdownEvent(count); + + using var pool = new DynamicWorkerPool(item => + { + // items sharing item % maxWorkers land on the same partition and must never overlap + var partition = item % maxWorkers; + if (active.AddOrUpdate(partition, 1, (_, c) => c + 1) > 1) + { + Interlocked.Exchange(ref overlapDetected, 1); + } + Thread.SpinWait(50); + active.AddOrUpdate(partition, 0, (_, c) => c - 1); + done.Signal(); + }, minWorkers: 4, maxWorkers: maxWorkers); + pool.Start(); + + for (var i = 0; i < count; i++) + { + // many distinct keys colliding on the same partitions (key % maxWorkers) + pool.Enqueue(i, i); + } + + Assert.IsTrue(done.Wait(15000)); + Assert.AreEqual(0, overlapDetected, "the same partition was processed by two workers at once"); + } + + [Test] + public void WaitForIdleReturnsFalseOnTimeoutAndTrueWhenDrained() + { + using var gate = new ManualResetEventSlim(false); + using var pool = new DynamicWorkerPool(_ => gate.Wait(), minWorkers: 2, maxWorkers: 4); + pool.Start(); + + pool.Enqueue(0, 0); + // a worker is stuck on the gate, so the pool is busy + Assert.IsFalse(pool.WaitForIdle(TimeSpan.FromMilliseconds(200))); + + gate.Set(); + Assert.IsTrue(pool.WaitForIdle(TimeSpan.FromSeconds(5))); + Assert.IsFalse(pool.IsBusy); + } + + [Test] + public void InvokesOnErrorWhenHandlerThrows() + { + using var raised = new ManualResetEventSlim(false); + Exception captured = null; + using var pool = new DynamicWorkerPool( + _ => throw new InvalidOperationException("boom"), + minWorkers: 1, + maxWorkers: 1, + onError: err => { captured = err; raised.Set(); }); + pool.Start(); + + pool.Enqueue(0, 0); + + Assert.IsTrue(raised.Wait(5000)); + Assert.IsInstanceOf(captured); + } + + [Test] + public void EnqueueRoutesNegativeKeysToValidPartition() + { + var processed = new ConcurrentBag(); + using var done = new CountdownEvent(4); + using var pool = new DynamicWorkerPool(i => { processed.Add(i); done.Signal(); }, 1, 4); + pool.Start(); + + // negative keys must still map to a valid partition without throwing + pool.Enqueue(-1, 10); + pool.Enqueue(-7, 20); + pool.Enqueue(-13, 30); + pool.Enqueue(-100, 40); + + Assert.IsTrue(done.Wait(5000)); + CollectionAssert.AreEquivalent(new[] { 10, 20, 30, 40 }, processed); + } + + [Test] + public void DisposeStopsWorkers() + { + var pool = new DynamicWorkerPool(_ => { }, 2, 4); + pool.Start(); + pool.Enqueue(0, 0); + Assert.IsTrue(pool.WaitForIdle(TimeSpan.FromSeconds(5))); + + Assert.DoesNotThrow(() => pool.Dispose()); + // disposing again is safe + Assert.DoesNotThrow(() => pool.Dispose()); + } + + [Test] + public void EnqueueBeforeStartIsProcessedOnStart() + { + var processed = new ConcurrentBag(); + using var done = new CountdownEvent(3); + using var pool = new DynamicWorkerPool(i => { processed.Add(i); done.Signal(); }, 2, 4); + + // enqueue before Start: items wait in their partitions until workers come up + pool.Enqueue(0, 1); + pool.Enqueue(1, 2); + pool.Enqueue(2, 3); + + pool.Start(); + + Assert.IsTrue(done.Wait(5000)); + CollectionAssert.AreEquivalent(new[] { 1, 2, 3 }, processed); + } + } +} diff --git a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs index ffb9fc557da2..eb63a57f2cdd 100644 --- a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs +++ b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs @@ -38,6 +38,7 @@ using System.Linq; using System.Reflection; using System.Threading; +using System.Threading.Tasks; using HistoryRequest = QuantConnect.Data.HistoryRequest; namespace QuantConnect.Tests.Engine.BrokerageTransactionHandlerTests @@ -2514,6 +2515,158 @@ public void ProcessesComboRequestsOnSameThreadWhenConcurrencyIsEnabled() } } + [Test] + public void TransactionThreadPoolStartsAtMinimumThreads() + { + var algorithm = new TestAlgorithm(); + using var brokerage = new TestingConcurrentBrokerage(); + using var finishedEvent = new ManualResetEventSlim(false); + var transactionHandler = new TestableConcurrentBrokerageTransactionHandler(1, finishedEvent); + transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler()); + + try + { + // the pool starts with the minimum number of worker threads and grows only on demand + Assert.AreEqual(2, transactionHandler.ActiveThreadCount); + } + finally + { + transactionHandler.Exit(); + } + } + + [TestCase(10)] + [TestCase(3)] + public void TransactionThreadPoolGrowsUnderBacklogUpToMaximum(int maximumThreads) + { + var algorithm = new TestAlgorithm(); + using var brokerage = new TestingConcurrentBrokerage(); + + using var finishedEvent = new ManualResetEventSlim(false); + using var gate = new ManualResetEventSlim(false); + var transactionHandler = new TestableConcurrentBrokerageTransactionHandler(int.MaxValue, finishedEvent) + { + Gate = gate, + MaxThreadsOverride = maximumThreads + }; + transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler()); + + try + { + algorithm.Transactions.SetOrderProcessor(transactionHandler); + + var security = (Security)algorithm.AddEquity("SPY"); + algorithm.SetFinishedWarmingUp(); + + var reference = new DateTime(2025, 07, 03, 10, 0, 0); + security.SetMarketPrice(new Tick(reference, security.Symbol, 300, 300)); + + // starts at the minimum + Assert.AreEqual(2, transactionHandler.ActiveThreadCount); + + // keep feeding orders while the workers stay busy on the gate (sustained saturation), + // which is what makes the starving pool grow up to the configured maximum + var orderId = 0; + var reachedMax = SpinWait.SpinUntil(() => + { + if (orderId < 1000) + { + var request = MakeAsyncMarketRequest(security, reference); + request.SetOrderId(++orderId); + transactionHandler.Process(request); + } + return transactionHandler.ActiveThreadCount >= maximumThreads; + }, 10000); + + Assert.IsTrue(reachedMax, $"Pool did not grow to the maximum, current size: {transactionHandler.ActiveThreadCount}"); + // never grows beyond the configured maximum + Assert.AreEqual(maximumThreads, transactionHandler.ActiveThreadCount); + } + finally + { + gate.Set(); + transactionHandler.Exit(); + } + } + + [Test] + public void PreservesRequestOrderPerPartitionUnderScaling() + { + var algorithm = new TestAlgorithm(); + using var brokerage = new TestingConcurrentBrokerage(); + + const int maxThreads = 10; + using var finishedEvent = new ManualResetEventSlim(false); + using var gate = new ManualResetEventSlim(false); + var transactionHandler = new TestableConcurrentBrokerageTransactionHandler(int.MaxValue, finishedEvent) + { + Gate = gate, + MaxThreadsOverride = maxThreads, + // isolate the pool's delivery ordering from the order/brokerage pipeline + RecordOnly = true + }; + transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler()); + + try + { + algorithm.Transactions.SetOrderProcessor(transactionHandler); + + var security = (Security)algorithm.AddEquity("SPY"); + algorithm.SetFinishedWarmingUp(); + + var reference = new DateTime(2025, 07, 03, 10, 0, 0); + security.SetMarketPrice(new Tick(reference, security.Symbol, 300, 300)); + + // feed orders across all partitions while workers block on the gate, so the backlog grows the pool to the maximum + var orderId = 0; + var reachedMax = SpinWait.SpinUntil(() => + { + if (orderId < 1000) + { + var request = MakeAsyncMarketRequest(security, reference); + request.SetOrderId(++orderId); + transactionHandler.Process(request); + } + return transactionHandler.ActiveThreadCount >= maxThreads; + }, 10000); + Assert.IsTrue(reachedMax, $"Pool did not grow to the maximum, current size: {transactionHandler.ActiveThreadCount}"); + + // keep a healthy backlog on every partition before releasing the workers + for (var i = 0; i < maxThreads * 5; i++) + { + var request = MakeAsyncMarketRequest(security, reference); + request.SetOrderId(++orderId); + transactionHandler.Process(request); + } + var enqueued = orderId; + + // release the workers and wait until the whole backlog drains + gate.Set(); + Assert.IsTrue(SpinWait.SpinUntil(() => transactionHandler.ProcessingSequence.Count >= enqueued, 15000), + $"processed {transactionHandler.ProcessingSequence.Count}/{enqueued}"); + + // within each partition, requests must keep their enqueue order (ascending OrderId) despite the pool scaling up + var processed = transactionHandler.ProcessingSequence.ToList(); + foreach (var partition in processed.GroupBy(x => x.OrderId % maxThreads)) + { + var ids = partition.Select(x => x.OrderId).ToList(); + CollectionAssert.AreEqual(ids.OrderBy(x => x).ToList(), ids, + $"partition {partition.Key} was processed out of order: {string.Join(",", ids)}"); + } + } + finally + { + gate.Set(); + transactionHandler.Exit(); + } + } + + private static SubmitOrderRequest MakeAsyncMarketRequest(Security security, DateTime date) + { + return new SubmitOrderRequest(OrderType.Market, security.Type, security.Symbol, 1, 0, 0, 0, 0, false, date, "", + asynchronous: true); + } + [TestCase("OnAccountChanged")] [TestCase("OnOptionNotification")] [TestCase("OnNewBrokerageOrderNotification")] @@ -2875,6 +3028,24 @@ private class TestableConcurrentBrokerageTransactionHandler : BrokerageTransacti public ConcurrentDictionary RequestProcessingThreads = new(); + // ordered record of processed requests, to assert per-OrderId ordering + public ConcurrentQueue<(int OrderId, OrderRequestType Type)> ProcessingSequence = new(); + + // blocks workers to force a backlog + public ManualResetEventSlim Gate; + + // slows workers down to let a backlog build up + public int ProcessingDelayMs; + + // only record the delivery order, skipping the base order pipeline + public bool RecordOnly; + + public int ActiveThreadCount => ProcessingThreadsCount; + + // overrides the pool maximum without touching the global Config + public int? MaxThreadsOverride { get; set; } + protected override int MaximumTransactionThreads => MaxThreadsOverride ?? base.MaximumTransactionThreads; + public TestableConcurrentBrokerageTransactionHandler(int expectedOrdersCount, ManualResetEventSlim finishedEvent) { _expectedOrdersCount = expectedOrdersCount; @@ -2883,7 +3054,16 @@ public TestableConcurrentBrokerageTransactionHandler(int expectedOrdersCount, Ma public override void HandleOrderRequest(OrderRequest request) { - base.HandleOrderRequest(request); + Gate?.Wait(); + if (ProcessingDelayMs > 0) + { + Thread.Sleep(ProcessingDelayMs); + } + + if (!RecordOnly) + { + base.HandleOrderRequest(request); + } // Capture the thread name for debugging purposes var threadName = Thread.CurrentThread.Name ?? Environment.CurrentManagedThreadId.ToString(); @@ -2894,6 +3074,7 @@ public override void HandleOrderRequest(OrderRequest request) RequestProcessingThreads[request.OrderId] = threadName; ProcessedRequests.Add(request); + ProcessingSequence.Enqueue((request.OrderId, request.OrderRequestType)); if (Interlocked.Increment(ref _currentOrdersCount) >= _expectedOrdersCount) { From 5f2d988b6467b14f721436380ecce5b490c7e830 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Wed, 17 Jun 2026 13:17:01 -0500 Subject: [PATCH 2/8] Simplify dynamic worker pool --- Common/Util/DynamicWorkerPool.cs | 259 ++++++------------ Tests/Common/Util/DynamicWorkerPoolTests.cs | 122 +-------- .../BrokerageTransactionHandlerTests.cs | 91 +----- 3 files changed, 94 insertions(+), 378 deletions(-) diff --git a/Common/Util/DynamicWorkerPool.cs b/Common/Util/DynamicWorkerPool.cs index 33cc199ecca3..d2bd3af2811c 100644 --- a/Common/Util/DynamicWorkerPool.cs +++ b/Common/Util/DynamicWorkerPool.cs @@ -15,16 +15,15 @@ */ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; namespace QuantConnect.Util { /// - /// A worker pool that routes items into a fixed number of partitions by key, keeping the routing - /// stable while the number of workers grows on demand from a minimum up to a maximum when busy. - /// Each partition is processed by a single worker at a time, so items sharing a key keep their order. + /// A worker pool that routes items into queues by key and processes each queue with its own thread. + /// It starts with a minimum number of workers and adds more on demand (up to a maximum) when a queue + /// starts to pile up. Items sharing a key go to the same queue, so they keep their relative order. /// /// The item type being processed public class DynamicWorkerPool : IDisposable @@ -35,32 +34,39 @@ public class DynamicWorkerPool : IDisposable private readonly int _minWorkers; private readonly int _maxWorkers; - private readonly ConcurrentQueue[] _partitions; - // 0 = free, 1 = claimed; ensures at most one worker processes a partition at a time - private readonly int[] _claims; - private readonly ManualResetEventSlim _workAvailable; - private readonly CancellationTokenSource _cancellationTokenSource; + private readonly BusyBlockingCollection[] _queues; private readonly List _workers; - private readonly object _workersLock = new object(); - - private int _activeWorkerCount; - private int _busyWorkers; - private bool _started; + private readonly CancellationTokenSource _cancellationTokenSource; + private readonly object _lock = new object(); + private int _activeWorkers; + private bool _disposed; /// /// The number of worker threads currently running /// - public int WorkerCount => Volatile.Read(ref _activeWorkerCount); - - /// - /// The fixed number of partitions used to route items (equal to the maximum worker count) - /// - public int PartitionCount => _partitions.Length; + public int WorkerCount => Volatile.Read(ref _activeWorkers); /// - /// True while any partition has pending work or any worker is still processing an item + /// True while any queue still has items to process /// - public bool IsBusy => IsPoolBusy(); + public bool IsBusy + { + get + { + if (Volatile.Read(ref _disposed)) + { + return false; + } + for (var i = 0; i < _queues.Length; i++) + { + if (_queues[i].IsBusy) + { + return true; + } + } + return false; + } + } /// /// Initializes a new instance of the class @@ -78,15 +84,13 @@ public DynamicWorkerPool(Action handler, int minWorkers, int maxWorkers, Acti _onError = onError; _threadName = threadName; - _partitions = new ConcurrentQueue[_maxWorkers]; + _queues = new BusyBlockingCollection[_maxWorkers]; for (var i = 0; i < _maxWorkers; i++) { - _partitions[i] = new ConcurrentQueue(); + _queues[i] = new BusyBlockingCollection(); } - _claims = new int[_maxWorkers]; - _workAvailable = new ManualResetEventSlim(false); - _cancellationTokenSource = new CancellationTokenSource(); _workers = new List(_maxWorkers); + _cancellationTokenSource = new CancellationTokenSource(); } /// @@ -94,54 +98,52 @@ public DynamicWorkerPool(Action handler, int minWorkers, int maxWorkers, Acti /// public void Start() { - lock (_workersLock) + lock (_lock) { - if (_started) + if (_workers.Count > 0) { return; } - _started = true; - for (var i = 0; i < _minWorkers; i++) { - _workers.Add(NewWorker(i)); - } - _activeWorkerCount = _minWorkers; - foreach (var worker in _workers) - { - worker.Start(); + StartWorker(i); } + _activeWorkers = _minWorkers; } } /// - /// Enqueues an item to be processed. Items are routed to a partition by , - /// so all items sharing the same key land on the same partition and keep their relative order. + /// Routes an item to a queue by and adds a worker if that queue is piling up /// - /// The routing key (e.g. an order id); the same key always maps to the same partition + /// The routing key; the same key maps to the same queue while the pool size is stable /// The item to process public void Enqueue(long key, T item) { - var partition = (int)(key % _partitions.Length); - if (partition < 0) + var active = Volatile.Read(ref _activeWorkers); + var index = (int)(key % active); + if (index < 0) { - partition += _partitions.Length; + index += active; } - _partitions[partition].Enqueue(item); - // signal the workers and grow the pool if the partitions are starving - _workAvailable.Set(); - MaybeScaleUp(); + var queue = _queues[index]; + queue.Add(item); + + // the queue is piling up faster than its worker can process it: grow the pool + if (active < _maxWorkers && queue.Count > 1) + { + Grow(); + } } /// - /// Waits until all partitions are empty and no worker is processing, or the timeout elapses + /// Waits until all queues are empty and idle, or the timeout elapses /// /// True if the pool became idle, false on timeout public bool WaitForIdle(TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout; - while (IsPoolBusy()) + while (IsBusy) { if (DateTime.UtcNow >= deadline) { @@ -157,163 +159,74 @@ public bool WaitForIdle(TimeSpan timeout) /// public void Dispose() { - if (!_cancellationTokenSource.IsCancellationRequested) - { - _cancellationTokenSource.Cancel(); - } - _workAvailable.Set(); - - lock (_workersLock) + lock (_lock) { - foreach (var worker in _workers) + if (_disposed) { - worker?.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource); + return; } + _disposed = true; } - _workAvailable.DisposeSafely(); - _cancellationTokenSource.DisposeSafely(); - } - - private Thread NewWorker(int id) - { - return new Thread(WorkerLoop) { IsBackground = true, Name = $"{_threadName} {id}" }; - } + _cancellationTokenSource.Cancel(); + foreach (var queue in _queues) + { + queue.CompleteAdding(); + } - /// - /// Worker entry point. Scans the partitions, claiming and processing any that have pending work, - /// and blocks when there is none. - /// - private void WorkerLoop() - { - var token = _cancellationTokenSource.Token; - try + lock (_lock) { - while (!token.IsCancellationRequested) + foreach (var worker in _workers) { - if (!ProcessAvailable()) - { - // no work found: reset and re-scan before blocking to avoid lost wake-ups - _workAvailable.Reset(); - if (!ProcessAvailable()) - { - _workAvailable.Wait(token); - } - } + worker?.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource); } } - catch (OperationCanceledException) - { - // shutting down - } - catch (Exception err) - { - _onError?.Invoke(err); - } - finally + + foreach (var queue in _queues) { - Interlocked.Decrement(ref _activeWorkerCount); + queue.DisposeSafely(); } + _cancellationTokenSource.DisposeSafely(); } - /// - /// Scans all partitions and processes the ones with pending work. A partition is claimed before - /// processing so at most one worker handles it at a time, preserving per-key ordering. - /// - /// True if any work was processed - private bool ProcessAvailable() + private void Grow() { - var worked = false; - for (var i = 0; i < _partitions.Length; i++) + lock (_lock) { - var partition = _partitions[i]; - if (partition.IsEmpty) - { - continue; - } - - // claim the partition; if another worker owns it, skip and let that worker process it - if (Interlocked.CompareExchange(ref _claims[i], 1, 0) != 0) - { - continue; - } - - Interlocked.Increment(ref _busyWorkers); - try - { - while (partition.TryDequeue(out var item)) - { - _handler(item); - worked = true; - } - } - finally - { - Interlocked.Decrement(ref _busyWorkers); - Volatile.Write(ref _claims[i], 0); - } - - // items may have been added between our last dequeue and releasing the claim; - // make sure a worker wakes up to handle them - if (!partition.IsEmpty) + if (_activeWorkers >= _maxWorkers || _cancellationTokenSource.IsCancellationRequested) { - _workAvailable.Set(); + return; } + StartWorker(_activeWorkers); + // publish the new worker only after it has started so routing never targets a missing queue + _activeWorkers++; } - return worked; } - /// - /// Grows the pool by one worker (up to the maximum) when the partitions are starving, i.e. every - /// running worker is already busy at the moment new work is enqueued. - /// - private void MaybeScaleUp() + private void StartWorker(int index) { - var active = Volatile.Read(ref _activeWorkerCount); - if (active >= _maxWorkers) - { - return; - } - - if (Volatile.Read(ref _busyWorkers) >= active) - { - TrySpawnWorker(); - } + var worker = new Thread(() => WorkerLoop(index)) { IsBackground = true, Name = $"{_threadName} {index}" }; + _workers.Add(worker); + worker.Start(); } - private void TrySpawnWorker() + private void WorkerLoop(int index) { - lock (_workersLock) + try { - if (!_started || _activeWorkerCount >= _maxWorkers || _cancellationTokenSource.IsCancellationRequested) + foreach (var item in _queues[index].GetConsumingEnumerable(_cancellationTokenSource.Token)) { - return; + _handler(item); } - - var worker = NewWorker(_workers.Count); - _workers.Add(worker); - _activeWorkerCount++; - worker.Start(); } - - // wake the new worker (and any idle ones) to pick up the backlog - _workAvailable.Set(); - } - - private bool IsPoolBusy() - { - if (Volatile.Read(ref _busyWorkers) > 0) + catch (OperationCanceledException) { - return true; + // shutting down } - for (var i = 0; i < _partitions.Length; i++) + catch (Exception err) { - if (!_partitions[i].IsEmpty) - { - return true; - } + _onError?.Invoke(err); } - return false; } } } diff --git a/Tests/Common/Util/DynamicWorkerPoolTests.cs b/Tests/Common/Util/DynamicWorkerPoolTests.cs index 22eb6c5db31f..9860aad881e0 100644 --- a/Tests/Common/Util/DynamicWorkerPoolTests.cs +++ b/Tests/Common/Util/DynamicWorkerPoolTests.cs @@ -16,7 +16,6 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Linq; using System.Threading; using NUnit.Framework; @@ -34,24 +33,6 @@ public void StartsWithMinimumWorkers() pool.Start(); Assert.AreEqual(2, pool.WorkerCount); - Assert.AreEqual(10, pool.PartitionCount); - } - - [Test] - public void ClampsMinAndMaxWorkers() - { - // min is clamped to at least 1, and to at most max - using var pool = new DynamicWorkerPool(_ => { }, minWorkers: 0, maxWorkers: 1); - pool.Start(); - - Assert.AreEqual(1, pool.WorkerCount); - Assert.AreEqual(1, pool.PartitionCount); - } - - [Test] - public void ThrowsOnNullHandler() - { - Assert.Throws(() => new DynamicWorkerPool(null, 1, 2)); } [Test] @@ -86,7 +67,7 @@ public void GrowsUnderBacklogUpToMaximum([Values(10, 3)] int maxWorkers) try { - // keep feeding work while the workers stay busy on the gate, so the starving pool grows + // workers block on the gate, so the queues pile up and the pool grows to the maximum var key = 0; var reachedMax = SpinWait.SpinUntil(() => { @@ -111,7 +92,7 @@ public void GrowsUnderBacklogUpToMaximum([Values(10, 3)] int maxWorkers) [Test] public void DoesNotGrowWhenWorkersKeepUp() { - // workers process instantly, so there is never a starving backlog and the pool stays minimal + // workers process instantly, so the queues never pile up and the pool stays minimal using var pool = new DynamicWorkerPool(_ => { }, minWorkers: 2, maxWorkers: 10); pool.Start(); @@ -128,29 +109,22 @@ public void DoesNotGrowWhenWorkersKeepUp() [Test] public void PreservesOrderPerKey() { - const int maxWorkers = 10; - const int keysCount = maxWorkers; // one logical key per partition + // with a stable pool size, items sharing a key go to the same queue and keep their order + const int workers = 10; const int itemsPerKey = 50; - using var gate = new ManualResetEventSlim(false); var sequence = new ConcurrentQueue<(int Key, int Value)>(); - - using var pool = new DynamicWorkerPool<(int Key, int Value)>(item => - { - gate.Wait(); - sequence.Enqueue(item); - }, minWorkers: 2, maxWorkers: maxWorkers); + using var pool = new DynamicWorkerPool<(int Key, int Value)>(sequence.Enqueue, + minWorkers: workers, maxWorkers: workers); pool.Start(); - // interleave items across keys; items with the same key must keep their relative order for (var n = 0; n < itemsPerKey; n++) { - for (var key = 0; key < keysCount; key++) + for (var key = 0; key < workers; key++) { pool.Enqueue(key, (key, n)); } } - gate.Set(); Assert.IsTrue(pool.WaitForIdle(TimeSpan.FromSeconds(10))); foreach (var group in sequence.ToList().GroupBy(x => x.Key)) @@ -161,39 +135,6 @@ public void PreservesOrderPerKey() } } - [Test] - public void NeverProcessesSamePartitionConcurrently() - { - const int maxWorkers = 10; - var active = new ConcurrentDictionary(); - var overlapDetected = 0; - const int count = 2000; - using var done = new CountdownEvent(count); - - using var pool = new DynamicWorkerPool(item => - { - // items sharing item % maxWorkers land on the same partition and must never overlap - var partition = item % maxWorkers; - if (active.AddOrUpdate(partition, 1, (_, c) => c + 1) > 1) - { - Interlocked.Exchange(ref overlapDetected, 1); - } - Thread.SpinWait(50); - active.AddOrUpdate(partition, 0, (_, c) => c - 1); - done.Signal(); - }, minWorkers: 4, maxWorkers: maxWorkers); - pool.Start(); - - for (var i = 0; i < count; i++) - { - // many distinct keys colliding on the same partitions (key % maxWorkers) - pool.Enqueue(i, i); - } - - Assert.IsTrue(done.Wait(15000)); - Assert.AreEqual(0, overlapDetected, "the same partition was processed by two workers at once"); - } - [Test] public void WaitForIdleReturnsFalseOnTimeoutAndTrueWhenDrained() { @@ -227,54 +168,5 @@ public void InvokesOnErrorWhenHandlerThrows() Assert.IsTrue(raised.Wait(5000)); Assert.IsInstanceOf(captured); } - - [Test] - public void EnqueueRoutesNegativeKeysToValidPartition() - { - var processed = new ConcurrentBag(); - using var done = new CountdownEvent(4); - using var pool = new DynamicWorkerPool(i => { processed.Add(i); done.Signal(); }, 1, 4); - pool.Start(); - - // negative keys must still map to a valid partition without throwing - pool.Enqueue(-1, 10); - pool.Enqueue(-7, 20); - pool.Enqueue(-13, 30); - pool.Enqueue(-100, 40); - - Assert.IsTrue(done.Wait(5000)); - CollectionAssert.AreEquivalent(new[] { 10, 20, 30, 40 }, processed); - } - - [Test] - public void DisposeStopsWorkers() - { - var pool = new DynamicWorkerPool(_ => { }, 2, 4); - pool.Start(); - pool.Enqueue(0, 0); - Assert.IsTrue(pool.WaitForIdle(TimeSpan.FromSeconds(5))); - - Assert.DoesNotThrow(() => pool.Dispose()); - // disposing again is safe - Assert.DoesNotThrow(() => pool.Dispose()); - } - - [Test] - public void EnqueueBeforeStartIsProcessedOnStart() - { - var processed = new ConcurrentBag(); - using var done = new CountdownEvent(3); - using var pool = new DynamicWorkerPool(i => { processed.Add(i); done.Signal(); }, 2, 4); - - // enqueue before Start: items wait in their partitions until workers come up - pool.Enqueue(0, 1); - pool.Enqueue(1, 2); - pool.Enqueue(2, 3); - - pool.Start(); - - Assert.IsTrue(done.Wait(5000)); - CollectionAssert.AreEquivalent(new[] { 1, 2, 3 }, processed); - } } } diff --git a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs index eb63a57f2cdd..d9120cf59d07 100644 --- a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs +++ b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs @@ -2589,78 +2589,6 @@ public void TransactionThreadPoolGrowsUnderBacklogUpToMaximum(int maximumThreads } } - [Test] - public void PreservesRequestOrderPerPartitionUnderScaling() - { - var algorithm = new TestAlgorithm(); - using var brokerage = new TestingConcurrentBrokerage(); - - const int maxThreads = 10; - using var finishedEvent = new ManualResetEventSlim(false); - using var gate = new ManualResetEventSlim(false); - var transactionHandler = new TestableConcurrentBrokerageTransactionHandler(int.MaxValue, finishedEvent) - { - Gate = gate, - MaxThreadsOverride = maxThreads, - // isolate the pool's delivery ordering from the order/brokerage pipeline - RecordOnly = true - }; - transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler()); - - try - { - algorithm.Transactions.SetOrderProcessor(transactionHandler); - - var security = (Security)algorithm.AddEquity("SPY"); - algorithm.SetFinishedWarmingUp(); - - var reference = new DateTime(2025, 07, 03, 10, 0, 0); - security.SetMarketPrice(new Tick(reference, security.Symbol, 300, 300)); - - // feed orders across all partitions while workers block on the gate, so the backlog grows the pool to the maximum - var orderId = 0; - var reachedMax = SpinWait.SpinUntil(() => - { - if (orderId < 1000) - { - var request = MakeAsyncMarketRequest(security, reference); - request.SetOrderId(++orderId); - transactionHandler.Process(request); - } - return transactionHandler.ActiveThreadCount >= maxThreads; - }, 10000); - Assert.IsTrue(reachedMax, $"Pool did not grow to the maximum, current size: {transactionHandler.ActiveThreadCount}"); - - // keep a healthy backlog on every partition before releasing the workers - for (var i = 0; i < maxThreads * 5; i++) - { - var request = MakeAsyncMarketRequest(security, reference); - request.SetOrderId(++orderId); - transactionHandler.Process(request); - } - var enqueued = orderId; - - // release the workers and wait until the whole backlog drains - gate.Set(); - Assert.IsTrue(SpinWait.SpinUntil(() => transactionHandler.ProcessingSequence.Count >= enqueued, 15000), - $"processed {transactionHandler.ProcessingSequence.Count}/{enqueued}"); - - // within each partition, requests must keep their enqueue order (ascending OrderId) despite the pool scaling up - var processed = transactionHandler.ProcessingSequence.ToList(); - foreach (var partition in processed.GroupBy(x => x.OrderId % maxThreads)) - { - var ids = partition.Select(x => x.OrderId).ToList(); - CollectionAssert.AreEqual(ids.OrderBy(x => x).ToList(), ids, - $"partition {partition.Key} was processed out of order: {string.Join(",", ids)}"); - } - } - finally - { - gate.Set(); - transactionHandler.Exit(); - } - } - private static SubmitOrderRequest MakeAsyncMarketRequest(Security security, DateTime date) { return new SubmitOrderRequest(OrderType.Market, security.Type, security.Symbol, 1, 0, 0, 0, 0, false, date, "", @@ -3028,18 +2956,9 @@ private class TestableConcurrentBrokerageTransactionHandler : BrokerageTransacti public ConcurrentDictionary RequestProcessingThreads = new(); - // ordered record of processed requests, to assert per-OrderId ordering - public ConcurrentQueue<(int OrderId, OrderRequestType Type)> ProcessingSequence = new(); - // blocks workers to force a backlog public ManualResetEventSlim Gate; - // slows workers down to let a backlog build up - public int ProcessingDelayMs; - - // only record the delivery order, skipping the base order pipeline - public bool RecordOnly; - public int ActiveThreadCount => ProcessingThreadsCount; // overrides the pool maximum without touching the global Config @@ -3055,15 +2974,8 @@ public TestableConcurrentBrokerageTransactionHandler(int expectedOrdersCount, Ma public override void HandleOrderRequest(OrderRequest request) { Gate?.Wait(); - if (ProcessingDelayMs > 0) - { - Thread.Sleep(ProcessingDelayMs); - } - if (!RecordOnly) - { - base.HandleOrderRequest(request); - } + base.HandleOrderRequest(request); // Capture the thread name for debugging purposes var threadName = Thread.CurrentThread.Name ?? Environment.CurrentManagedThreadId.ToString(); @@ -3074,7 +2986,6 @@ public override void HandleOrderRequest(OrderRequest request) RequestProcessingThreads[request.OrderId] = threadName; ProcessedRequests.Add(request); - ProcessingSequence.Enqueue((request.OrderId, request.OrderRequestType)); if (Interlocked.Increment(ref _currentOrdersCount) >= _expectedOrdersCount) { From 31450d4fa50b951cb37e8b80329dcfcb0ec57837 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Wed, 17 Jun 2026 13:41:54 -0500 Subject: [PATCH 3/8] Move dynamic thread pool into transaction handler --- Common/Util/DynamicWorkerPool.cs | 232 ------------------ .../BrokerageTransactionHandler.cs | 124 ++++++---- Tests/Common/Util/DynamicWorkerPoolTests.cs | 172 ------------- .../BrokerageTransactionHandlerTests.cs | 3 +- 4 files changed, 81 insertions(+), 450 deletions(-) delete mode 100644 Common/Util/DynamicWorkerPool.cs delete mode 100644 Tests/Common/Util/DynamicWorkerPoolTests.cs diff --git a/Common/Util/DynamicWorkerPool.cs b/Common/Util/DynamicWorkerPool.cs deleted file mode 100644 index d2bd3af2811c..000000000000 --- a/Common/Util/DynamicWorkerPool.cs +++ /dev/null @@ -1,232 +0,0 @@ -/* - * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. - * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * -*/ - -using System; -using System.Collections.Generic; -using System.Threading; - -namespace QuantConnect.Util -{ - /// - /// A worker pool that routes items into queues by key and processes each queue with its own thread. - /// It starts with a minimum number of workers and adds more on demand (up to a maximum) when a queue - /// starts to pile up. Items sharing a key go to the same queue, so they keep their relative order. - /// - /// The item type being processed - public class DynamicWorkerPool : IDisposable - { - private readonly Action _handler; - private readonly Action _onError; - private readonly string _threadName; - private readonly int _minWorkers; - private readonly int _maxWorkers; - - private readonly BusyBlockingCollection[] _queues; - private readonly List _workers; - private readonly CancellationTokenSource _cancellationTokenSource; - private readonly object _lock = new object(); - private int _activeWorkers; - private bool _disposed; - - /// - /// The number of worker threads currently running - /// - public int WorkerCount => Volatile.Read(ref _activeWorkers); - - /// - /// True while any queue still has items to process - /// - public bool IsBusy - { - get - { - if (Volatile.Read(ref _disposed)) - { - return false; - } - for (var i = 0; i < _queues.Length; i++) - { - if (_queues[i].IsBusy) - { - return true; - } - } - return false; - } - } - - /// - /// Initializes a new instance of the class - /// - /// The action invoked to process each item - /// The number of worker threads to start with (at least 1) - /// The maximum number of worker threads the pool can grow to - /// Optional callback invoked when the handler throws an unexpected exception - /// Optional name prefix used for the worker threads - public DynamicWorkerPool(Action handler, int minWorkers, int maxWorkers, Action onError = null, string threadName = "DynamicWorkerPool") - { - _handler = handler ?? throw new ArgumentNullException(nameof(handler)); - _maxWorkers = Math.Max(1, maxWorkers); - _minWorkers = Math.Min(Math.Max(1, minWorkers), _maxWorkers); - _onError = onError; - _threadName = threadName; - - _queues = new BusyBlockingCollection[_maxWorkers]; - for (var i = 0; i < _maxWorkers; i++) - { - _queues[i] = new BusyBlockingCollection(); - } - _workers = new List(_maxWorkers); - _cancellationTokenSource = new CancellationTokenSource(); - } - - /// - /// Starts the pool with the minimum number of worker threads. Idempotent. - /// - public void Start() - { - lock (_lock) - { - if (_workers.Count > 0) - { - return; - } - for (var i = 0; i < _minWorkers; i++) - { - StartWorker(i); - } - _activeWorkers = _minWorkers; - } - } - - /// - /// Routes an item to a queue by and adds a worker if that queue is piling up - /// - /// The routing key; the same key maps to the same queue while the pool size is stable - /// The item to process - public void Enqueue(long key, T item) - { - var active = Volatile.Read(ref _activeWorkers); - var index = (int)(key % active); - if (index < 0) - { - index += active; - } - - var queue = _queues[index]; - queue.Add(item); - - // the queue is piling up faster than its worker can process it: grow the pool - if (active < _maxWorkers && queue.Count > 1) - { - Grow(); - } - } - - /// - /// Waits until all queues are empty and idle, or the timeout elapses - /// - /// True if the pool became idle, false on timeout - public bool WaitForIdle(TimeSpan timeout) - { - var deadline = DateTime.UtcNow + timeout; - while (IsBusy) - { - if (DateTime.UtcNow >= deadline) - { - return false; - } - Thread.Sleep(1); - } - return true; - } - - /// - /// Stops the pool, signaling the workers to exit and waiting for them to finish - /// - public void Dispose() - { - lock (_lock) - { - if (_disposed) - { - return; - } - _disposed = true; - } - - _cancellationTokenSource.Cancel(); - foreach (var queue in _queues) - { - queue.CompleteAdding(); - } - - lock (_lock) - { - foreach (var worker in _workers) - { - worker?.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource); - } - } - - foreach (var queue in _queues) - { - queue.DisposeSafely(); - } - _cancellationTokenSource.DisposeSafely(); - } - - private void Grow() - { - lock (_lock) - { - if (_activeWorkers >= _maxWorkers || _cancellationTokenSource.IsCancellationRequested) - { - return; - } - StartWorker(_activeWorkers); - // publish the new worker only after it has started so routing never targets a missing queue - _activeWorkers++; - } - } - - private void StartWorker(int index) - { - var worker = new Thread(() => WorkerLoop(index)) { IsBackground = true, Name = $"{_threadName} {index}" }; - _workers.Add(worker); - worker.Start(); - } - - private void WorkerLoop(int index) - { - try - { - foreach (var item in _queues[index].GetConsumingEnumerable(_cancellationTokenSource.Token)) - { - _handler(item); - } - } - catch (OperationCanceledException) - { - // shutting down - } - catch (Exception err) - { - _onError?.Invoke(err); - } - } - } -} diff --git a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs index 5722bd1e5571..6fcb85d1e74e 100644 --- a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs +++ b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs @@ -75,9 +75,12 @@ public class BrokerageTransactionHandler : ITransactionHandler /// protected List> _orderRequestQueues { get; set; } - // Worker pool for concurrent order processing, routed by OrderId, growing on demand. Null in the synchronous backtest path. - private DynamicWorkerPool _pool; - + private List _processingThreads; + // the transaction thread pool starts with the minimum number of workers and grows up to the maximum on + // demand. One queue is allocated per potential worker so routing by OrderId stays stable as it grows. + private int _activeTransactionThreads; + private int _maximumTransactionThreads; + private readonly object _processingThreadsLock = new object(); private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); private readonly ConcurrentQueue _orderEvents = new ConcurrentQueue(); @@ -238,28 +241,23 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu /// protected virtual void InitializeTransactionThread() { - // The pool starts with the minimum number of workers and grows up to the maximum on demand. - // Requests are routed by OrderId, and each order is processed by a single worker at a time, - // which preserves the Submit/Update/Cancel ordering per OrderId even as the pool scales. - var maxThreads = _brokerage.ConcurrencyEnabled ? Math.Max(1, MaximumTransactionThreads) : 1; - var minThreads = _brokerage.ConcurrencyEnabled ? Math.Min(Math.Max(1, MinimumTransactionThreads), maxThreads) : 1; - - _pool = new DynamicWorkerPool( - request => - { - HandleOrderRequest(request); - ProcessAsynchronousEvents(); - }, - minThreads, - maxThreads, - onError: err => - { - // unexpected error, we need to close down shop - _algorithm.SetRuntimeError(err, "HandleOrderRequest"); - IsActive = false; - }, - threadName: "Transaction Thread"); - _pool.Start(); + // multi threaded queue, used for live deployments. We allocate one queue per potential worker so + // requests keep routing to the same queue by OrderId, but only start the minimum number of workers. + // More are started on demand (up to the maximum) when a queue starts to pile up. + _maximumTransactionThreads = _brokerage.ConcurrencyEnabled ? Math.Max(1, MaximumTransactionThreads) : 1; + var minThreads = _brokerage.ConcurrencyEnabled ? Math.Min(Math.Max(1, MinimumTransactionThreads), _maximumTransactionThreads) : 1; + + _orderRequestQueues = new(_maximumTransactionThreads); + _processingThreads = new(_maximumTransactionThreads); + for (var i = 0; i < _maximumTransactionThreads; i++) + { + _orderRequestQueues.Add(new BusyBlockingCollection()); + } + for (var i = 0; i < minThreads; i++) + { + StartProcessingThread(i); + } + _activeTransactionThreads = minThreads; } /// @@ -275,7 +273,28 @@ protected virtual void InitializeTransactionThread() /// /// The number of worker threads currently running in the dynamic transaction thread pool /// - protected int ProcessingThreadsCount => _pool?.WorkerCount ?? 0; + protected int ProcessingThreadsCount => Volatile.Read(ref _activeTransactionThreads); + + private void StartProcessingThread(int threadId) + { + var thread = new Thread(() => Run(threadId)) { IsBackground = true, Name = $"Transaction Thread {threadId}" }; + _processingThreads.Add(thread); + thread.Start(); + } + + private void GrowProcessingThreads() + { + lock (_processingThreadsLock) + { + if (_activeTransactionThreads >= _maximumTransactionThreads || _cancellationTokenSource.IsCancellationRequested) + { + return; + } + StartProcessingThread(_activeTransactionThreads); + // publish the new worker only after it started so routing never targets a queue without a thread + _activeTransactionThreads++; + } + } /// /// Boolean flag indicating the Run thread method is busy. @@ -697,8 +716,7 @@ public List GetOpenOrders(Func filter = null) } /// - /// Processes the order request queue synchronously. Used by the backtesting transaction handler, - /// which processes order requests on the algorithm thread instead of using the worker pool. + /// Primary thread entry point to launch the transaction thread. /// protected void Run(int threadId) { @@ -715,6 +733,12 @@ protected void Run(int threadId) // unexpected error, we need to close down shop _algorithm.SetRuntimeError(err, "HandleOrderRequest"); } + + if (_processingThreads != null) + { + Log.Trace($"BrokerageTransactionHandler.Run(): Ending Thread {threadId}..."); + IsActive = false; + } } /// @@ -735,11 +759,7 @@ public virtual void ProcessSynchronousEvents() // in backtesting we need to wait for orders to be removed from the queue and finished processing if (!_algorithm.LiveMode) { - if (_orderRequestQueues != null && _orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token))) - { - Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing."); - } - else if (_pool != null && !_pool.WaitForIdle(Time.OneSecond)) + if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token))) { Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing."); } @@ -822,14 +842,23 @@ public void AddOpenOrder(Order order, IAlgorithm algorithm) public void Exit() { var timeout = TimeSpan.FromSeconds(60); - if (_pool != null) + if (_processingThreads != null) { - // wait for the pool to finish processing pending requests, then stop the workers - if (!_pool.WaitForIdle(timeout)) + // only wait if the processing thread is running + if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(timeout))) { Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds."); } - _pool.DisposeSafely(); + + foreach (var queue in _orderRequestQueues) + { + queue.CompleteAdding(); + } + + foreach (var thread in _processingThreads) + { + thread?.StopSafely(timeout, _cancellationTokenSource); + } } IsActive = false; _cancellationTokenSource.DisposeSafely(); @@ -1952,22 +1981,29 @@ private string GetShortableErrorMessage(Symbol symbol, decimal quantity) private void EnqueueOrderRequest(OrderRequest request, Order order) { - // route by OrderId (or combo group id) so all requests for the same order are processed - // in order by a single worker; the pool keeps the routing stable as it scales + // route by OrderId (or combo group id) so all requests for the same order go to the same queue and + // are processed in order by a single worker; the routing stays stable while the pool size is steady var queueKey = request.OrderId; if (order.GroupOrderManager?.Id > 0) { queueKey = order.GroupOrderManager.Id; } - if (_pool != null) + var active = Volatile.Read(ref _activeTransactionThreads); + if (active == 0) { - _pool.Enqueue(queueKey, request); + // synchronous backtest path: a single queue processed on the algorithm thread + _orderRequestQueues[queueKey % _orderRequestQueues.Count].Add(request); + return; } - else + + var queue = _orderRequestQueues[queueKey % active]; + queue.Add(request); + + // the queue is piling up faster than its worker can process it: add a worker, up to the maximum + if (active < _maximumTransactionThreads && queue.Count > 1) { - // synchronous backtest path: a single queue processed on the algorithm thread - _orderRequestQueues[(int)(queueKey % _orderRequestQueues.Count)].Add(request); + GrowProcessingThreads(); } } diff --git a/Tests/Common/Util/DynamicWorkerPoolTests.cs b/Tests/Common/Util/DynamicWorkerPoolTests.cs deleted file mode 100644 index 9860aad881e0..000000000000 --- a/Tests/Common/Util/DynamicWorkerPoolTests.cs +++ /dev/null @@ -1,172 +0,0 @@ -/* - * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. - * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * -*/ - -using System; -using System.Collections.Concurrent; -using System.Linq; -using System.Threading; -using NUnit.Framework; -using QuantConnect.Util; - -namespace QuantConnect.Tests.Common.Util -{ - [TestFixture] - public class DynamicWorkerPoolTests - { - [Test] - public void StartsWithMinimumWorkers() - { - using var pool = new DynamicWorkerPool(_ => { }, minWorkers: 2, maxWorkers: 10); - pool.Start(); - - Assert.AreEqual(2, pool.WorkerCount); - } - - [Test] - public void ProcessesAllEnqueuedItems() - { - const int count = 200; - var processed = new ConcurrentBag(); - using var done = new CountdownEvent(count); - using var pool = new DynamicWorkerPool(i => - { - processed.Add(i); - done.Signal(); - }, minWorkers: 2, maxWorkers: 10); - pool.Start(); - - for (var i = 0; i < count; i++) - { - pool.Enqueue(i, i); - } - - Assert.IsTrue(done.Wait(10000)); - CollectionAssert.AreEquivalent(Enumerable.Range(0, count), processed); - } - - [Test] - public void GrowsUnderBacklogUpToMaximum([Values(10, 3)] int maxWorkers) - { - using var gate = new ManualResetEventSlim(false); - using var pool = new DynamicWorkerPool(_ => gate.Wait(), minWorkers: 2, maxWorkers: maxWorkers); - pool.Start(); - Assert.AreEqual(2, pool.WorkerCount); - - try - { - // workers block on the gate, so the queues pile up and the pool grows to the maximum - var key = 0; - var reachedMax = SpinWait.SpinUntil(() => - { - if (key < 1000) - { - pool.Enqueue(key, key); - key++; - } - return pool.WorkerCount >= maxWorkers; - }, 10000); - - Assert.IsTrue(reachedMax, $"Pool did not grow to the maximum, current size: {pool.WorkerCount}"); - // never grows beyond the configured maximum - Assert.AreEqual(maxWorkers, pool.WorkerCount); - } - finally - { - gate.Set(); - } - } - - [Test] - public void DoesNotGrowWhenWorkersKeepUp() - { - // workers process instantly, so the queues never pile up and the pool stays minimal - using var pool = new DynamicWorkerPool(_ => { }, minWorkers: 2, maxWorkers: 10); - pool.Start(); - - for (var i = 0; i < 50; i++) - { - pool.Enqueue(i, i); - Thread.Sleep(1); - } - - Assert.IsTrue(pool.WaitForIdle(TimeSpan.FromSeconds(5))); - Assert.AreEqual(2, pool.WorkerCount); - } - - [Test] - public void PreservesOrderPerKey() - { - // with a stable pool size, items sharing a key go to the same queue and keep their order - const int workers = 10; - const int itemsPerKey = 50; - var sequence = new ConcurrentQueue<(int Key, int Value)>(); - using var pool = new DynamicWorkerPool<(int Key, int Value)>(sequence.Enqueue, - minWorkers: workers, maxWorkers: workers); - pool.Start(); - - for (var n = 0; n < itemsPerKey; n++) - { - for (var key = 0; key < workers; key++) - { - pool.Enqueue(key, (key, n)); - } - } - - Assert.IsTrue(pool.WaitForIdle(TimeSpan.FromSeconds(10))); - - foreach (var group in sequence.ToList().GroupBy(x => x.Key)) - { - var values = group.Select(x => x.Value).ToList(); - CollectionAssert.AreEqual(Enumerable.Range(0, itemsPerKey).ToList(), values, - $"key {group.Key} was processed out of order"); - } - } - - [Test] - public void WaitForIdleReturnsFalseOnTimeoutAndTrueWhenDrained() - { - using var gate = new ManualResetEventSlim(false); - using var pool = new DynamicWorkerPool(_ => gate.Wait(), minWorkers: 2, maxWorkers: 4); - pool.Start(); - - pool.Enqueue(0, 0); - // a worker is stuck on the gate, so the pool is busy - Assert.IsFalse(pool.WaitForIdle(TimeSpan.FromMilliseconds(200))); - - gate.Set(); - Assert.IsTrue(pool.WaitForIdle(TimeSpan.FromSeconds(5))); - Assert.IsFalse(pool.IsBusy); - } - - [Test] - public void InvokesOnErrorWhenHandlerThrows() - { - using var raised = new ManualResetEventSlim(false); - Exception captured = null; - using var pool = new DynamicWorkerPool( - _ => throw new InvalidOperationException("boom"), - minWorkers: 1, - maxWorkers: 1, - onError: err => { captured = err; raised.Set(); }); - pool.Start(); - - pool.Enqueue(0, 0); - - Assert.IsTrue(raised.Wait(5000)); - Assert.IsInstanceOf(captured); - } - } -} diff --git a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs index d9120cf59d07..b4ebd8148660 100644 --- a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs +++ b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs @@ -38,7 +38,6 @@ using System.Linq; using System.Reflection; using System.Threading; -using System.Threading.Tasks; using HistoryRequest = QuantConnect.Data.HistoryRequest; namespace QuantConnect.Tests.Engine.BrokerageTransactionHandlerTests @@ -2956,7 +2955,7 @@ private class TestableConcurrentBrokerageTransactionHandler : BrokerageTransacti public ConcurrentDictionary RequestProcessingThreads = new(); - // blocks workers to force a backlog + // blocks workers to force a sustained backlog so the pool grows public ManualResetEventSlim Gate; public int ActiveThreadCount => ProcessingThreadsCount; From bf7ef20108aa3dd5c39495bb7df462db7f9d70f3 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Wed, 17 Jun 2026 15:05:11 -0500 Subject: [PATCH 4/8] Grow transaction threads on demand with order pinning --- .../BrokerageTransactionHandler.cs | 133 ++++++++++++------ .../BrokerageTransactionHandlerTests.cs | 125 +++++++++++++++- 2 files changed, 210 insertions(+), 48 deletions(-) diff --git a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs index 6fcb85d1e74e..f74936d4f792 100644 --- a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs +++ b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs @@ -76,10 +76,12 @@ public class BrokerageTransactionHandler : ITransactionHandler protected List> _orderRequestQueues { get; set; } private List _processingThreads; - // the transaction thread pool starts with the minimum number of workers and grows up to the maximum on - // demand. One queue is allocated per potential worker so routing by OrderId stays stable as it grows. - private int _activeTransactionThreads; + // maximum number of transaction threads (and queues) the pool can grow to on demand private int _maximumTransactionThreads; + // pins each order (or combo group) to one queue for its whole life, so all its requests are handled + // in order by the same thread even after the pool grows and changes the modulo used for new orders + private readonly Dictionary _orderRequestQueueIndexByKey = new(); + // guards on demand growth of the queues/threads against concurrent reads in Run/Exit/enqueue private readonly object _processingThreadsLock = new object(); private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); @@ -241,59 +243,86 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu /// protected virtual void InitializeTransactionThread() { - // multi threaded queue, used for live deployments. We allocate one queue per potential worker so - // requests keep routing to the same queue by OrderId, but only start the minimum number of workers. - // More are started on demand (up to the maximum) when a queue starts to pile up. - _maximumTransactionThreads = _brokerage.ConcurrencyEnabled ? Math.Max(1, MaximumTransactionThreads) : 1; - var minThreads = _brokerage.ConcurrencyEnabled ? Math.Min(Math.Max(1, MinimumTransactionThreads), _maximumTransactionThreads) : 1; + // live deployments start with the minimum number of threads and grow on demand (see TryExpandProcessingThreads) + // up to the maximum. No concurrency means a single thread, no growth. + int initialThreadsCount; + if (_brokerage.ConcurrencyEnabled) + { + _maximumTransactionThreads = Math.Max(1, MaximumTransactionThreads); + initialThreadsCount = Math.Min(Math.Max(1, MinimumTransactionThreads), _maximumTransactionThreads); + } + else + { + _maximumTransactionThreads = initialThreadsCount = 1; + } _orderRequestQueues = new(_maximumTransactionThreads); _processingThreads = new(_maximumTransactionThreads); - for (var i = 0; i < _maximumTransactionThreads; i++) + for (var i = 0; i < initialThreadsCount; i++) { - _orderRequestQueues.Add(new BusyBlockingCollection()); + AddProcessingThread(); } - for (var i = 0; i < minThreads; i++) - { - StartProcessingThread(i); - } - _activeTransactionThreads = minThreads; } /// - /// The maximum number of worker threads the dynamic transaction thread pool can grow to + /// The maximum number of transaction threads the pool can grow to /// protected virtual int MaximumTransactionThreads => Config.GetInt("maximum-transaction-threads", 10); /// - /// The number of worker threads the dynamic transaction thread pool starts with + /// The number of transaction threads the pool starts with /// protected virtual int MinimumTransactionThreads => Config.GetInt("minimum-transaction-threads", 2); /// - /// The number of worker threads currently running in the dynamic transaction thread pool + /// The number of transaction threads currently running /// - protected int ProcessingThreadsCount => Volatile.Read(ref _activeTransactionThreads); + protected int ProcessingThreadsCount + { + get + { + lock (_processingThreadsLock) + { + return _processingThreads?.Count ?? 0; + } + } + } - private void StartProcessingThread(int threadId) + /// + /// Creates a queue and its dedicated thread and starts it. + /// Callers growing the pool on demand must hold . + /// + private void AddProcessingThread() { + var threadId = _orderRequestQueues.Count; // matches the queue index this thread will consume + _orderRequestQueues.Add(new BusyBlockingCollection()); var thread = new Thread(() => Run(threadId)) { IsBackground = true, Name = $"Transaction Thread {threadId}" }; _processingThreads.Add(thread); thread.Start(); } - private void GrowProcessingThreads() + /// + /// Grows the pool only when every thread is busy and still has pending requests, up to the maximum. + /// Caller must hold . + /// + private void TryExpandProcessingThreads() { - lock (_processingThreadsLock) + if (_orderRequestQueues.Count >= _maximumTransactionThreads || _cancellationTokenSource.IsCancellationRequested) { - if (_activeTransactionThreads >= _maximumTransactionThreads || _cancellationTokenSource.IsCancellationRequested) + return; + } + + // only grow when the whole pool is saturated: every thread busy and with requests still waiting + for (var i = 0; i < _orderRequestQueues.Count; i++) + { + var queue = _orderRequestQueues[i]; + if (!queue.IsBusy || queue.Count == 0) { return; } - StartProcessingThread(_activeTransactionThreads); - // publish the new worker only after it started so routing never targets a queue without a thread - _activeTransactionThreads++; } + + AddProcessingThread(); } /// @@ -720,9 +749,16 @@ public List GetOpenOrders(Func filter = null) /// protected void Run(int threadId) { + IBusyCollection queue; + lock (_processingThreadsLock) + { + // capture our queue safely, the queues list may be growing on demand concurrently + queue = _orderRequestQueues[threadId]; + } + try { - foreach (var request in _orderRequestQueues[threadId].GetConsumingEnumerable(_cancellationTokenSource.Token)) + foreach (var request in queue.GetConsumingEnumerable(_cancellationTokenSource.Token)) { HandleOrderRequest(request); ProcessAsynchronousEvents(); @@ -844,18 +880,27 @@ public void Exit() var timeout = TimeSpan.FromSeconds(60); if (_processingThreads != null) { + // snapshot under the lock since the pool might still be growing on demand concurrently + List> queues; + List threads; + lock (_processingThreadsLock) + { + queues = _orderRequestQueues.ToList(); + threads = _processingThreads.ToList(); + } + // only wait if the processing thread is running - if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(timeout))) + if (queues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(timeout))) { Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds."); } - foreach (var queue in _orderRequestQueues) + foreach (var queue in queues) { queue.CompleteAdding(); } - foreach (var thread in _processingThreads) + foreach (var thread in threads) { thread?.StopSafely(timeout, _cancellationTokenSource); } @@ -1981,30 +2026,30 @@ private string GetShortableErrorMessage(Symbol symbol, decimal quantity) private void EnqueueOrderRequest(OrderRequest request, Order order) { - // route by OrderId (or combo group id) so all requests for the same order go to the same queue and - // are processed in order by a single worker; the routing stays stable while the pool size is steady + // route by OrderId (or combo group id) so requests for the same order keep their order on one queue var queueKey = request.OrderId; if (order.GroupOrderManager?.Id > 0) { queueKey = order.GroupOrderManager.Id; } - var active = Volatile.Read(ref _activeTransactionThreads); - if (active == 0) + IBusyCollection queue; + lock (_processingThreadsLock) { - // synchronous backtest path: a single queue processed on the algorithm thread - _orderRequestQueues[queueKey % _orderRequestQueues.Count].Add(request); - return; + // grow the pool first if every existing thread is already saturated + TryExpandProcessingThreads(); + + // reuse the order's pinned queue if it has one, so it is never re-routed when the pool grows + if (!_orderRequestQueueIndexByKey.TryGetValue(queueKey, out var queueIndex)) + { + queueIndex = queueKey % _orderRequestQueues.Count; + _orderRequestQueueIndexByKey[queueKey] = queueIndex; + } + queue = _orderRequestQueues[queueIndex]; } - var queue = _orderRequestQueues[queueKey % active]; + // add outside the lock, since it can block when the queue is at its bounded capacity queue.Add(request); - - // the queue is piling up faster than its worker can process it: add a worker, up to the maximum - if (active < _maximumTransactionThreads && queue.Count > 1) - { - GrowProcessingThreads(); - } } /// diff --git a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs index b4ebd8148660..4e008c19b9b5 100644 --- a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs +++ b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs @@ -2525,7 +2525,7 @@ public void TransactionThreadPoolStartsAtMinimumThreads() try { - // the pool starts with the minimum number of worker threads and grows only on demand + // the pool starts with the minimum number of threads and grows only on demand Assert.AreEqual(2, transactionHandler.ActiveThreadCount); } finally @@ -2563,8 +2563,7 @@ public void TransactionThreadPoolGrowsUnderBacklogUpToMaximum(int maximumThreads // starts at the minimum Assert.AreEqual(2, transactionHandler.ActiveThreadCount); - // keep feeding orders while the workers stay busy on the gate (sustained saturation), - // which is what makes the starving pool grow up to the configured maximum + // keep feeding orders while threads stay blocked on the gate, forcing the pool to grow to the max var orderId = 0; var reachedMax = SpinWait.SpinUntil(() => { @@ -2588,6 +2587,124 @@ public void TransactionThreadPoolGrowsUnderBacklogUpToMaximum(int maximumThreads } } + [Test] + public void KeepsAnOrderOnTheSameThreadAfterThePoolGrows() + { + var algorithm = new TestAlgorithm(); + using var brokerage = new TestingConcurrentBrokerage(); + using var finishedEvent = new ManualResetEventSlim(false); + using var gate = new ManualResetEventSlim(false); + var transactionHandler = new TestableConcurrentBrokerageTransactionHandler(int.MaxValue, finishedEvent) + { + Gate = gate, + MaxThreadsOverride = 10 + }; + transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler()); + + try + { + algorithm.Transactions.SetOrderProcessor(transactionHandler); + algorithm.SetCash(100000); + algorithm.SetFinishedWarmingUp(); + + var security1 = (Security)algorithm.AddEquity("SPY"); + var security2 = (Security)algorithm.AddEquity("AAPL"); + + var reference = new DateTime(2025, 07, 03, 10, 0, 0); + security1.SetMarketPrice(new Tick(reference, security1.Symbol, 500, 500)); + security2.SetMarketPrice(new Tick(reference, security2.Symbol, 200, 200)); + + // group id 2 pins to queue 0 (2 % 2) while the pool is at the minimum; once it grows to >= 3 + // an un-pinned request would route to queue 2 (2 % count), so this scenario detects re-routing + var groupOrderManager = new GroupOrderManager(2, 2, -1, 1m); + var leg1 = new SubmitOrderRequest(OrderType.ComboLimit, security1.Type, security1.Symbol, -1, 1m, 0, reference, "", + groupOrderManager: groupOrderManager); + leg1.SetOrderId(1); + transactionHandler.Process(leg1); + + Assert.AreEqual(2, transactionHandler.ActiveThreadCount); + + // saturate the pool with unrelated orders so it grows past the minimum + var orderId = 100; + var grew = SpinWait.SpinUntil(() => + { + if (orderId < 1100) + { + var request = MakeAsyncMarketRequest(security1, reference); + request.SetOrderId(++orderId); + transactionHandler.Process(request); + } + return transactionHandler.ActiveThreadCount >= 3; + }, 10000); + Assert.IsTrue(grew, $"the pool did not grow, current size: {transactionHandler.ActiveThreadCount}"); + + // leg 2 of the same combo arrives after the pool grew; the pin must keep it on the original queue + var leg2 = new SubmitOrderRequest(OrderType.ComboLimit, security2.Type, security2.Symbol, 1, 1m, 0, reference, "", + groupOrderManager: groupOrderManager); + leg2.SetOrderId(2); + transactionHandler.Process(leg2); + + gate.Set(); + + // both legs must have been handled by the same thread despite the pool growing in between + Assert.IsTrue(SpinWait.SpinUntil(() => + transactionHandler.RequestProcessingThreads.ContainsKey(leg1.OrderId) && + transactionHandler.RequestProcessingThreads.ContainsKey(leg2.OrderId), 10000), + "the combo legs were not processed"); + Assert.AreEqual(transactionHandler.RequestProcessingThreads[leg1.OrderId], + transactionHandler.RequestProcessingThreads[leg2.OrderId]); + } + finally + { + gate.Set(); + transactionHandler.Exit(); + } + } + + [Test] + public void DoesNotGrowWhenThePoolIsNotSaturated() + { + var algorithm = new TestAlgorithm(); + using var brokerage = new TestingConcurrentBrokerage(); + using var finishedEvent = new ManualResetEventSlim(false); + using var gate = new ManualResetEventSlim(false); + var transactionHandler = new TestableConcurrentBrokerageTransactionHandler(int.MaxValue, finishedEvent) + { + Gate = gate, + MaxThreadsOverride = 10 + }; + transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler()); + + try + { + algorithm.Transactions.SetOrderProcessor(transactionHandler); + algorithm.SetFinishedWarmingUp(); + + var security = (Security)algorithm.AddEquity("SPY"); + var reference = new DateTime(2025, 07, 03, 10, 0, 0); + security.SetMarketPrice(new Tick(reference, security.Symbol, 300, 300)); + + Assert.AreEqual(2, transactionHandler.ActiveThreadCount); + + // all even order ids route to the same queue (id % 2 == 0), keeping the other thread idle, + // so even with a backlog on one queue the pool must not grow + for (var i = 1; i <= 20; i++) + { + var request = MakeAsyncMarketRequest(security, reference); + request.SetOrderId(i * 2); + transactionHandler.Process(request); + } + + // growth is evaluated synchronously on each enqueue, so the count is final here + Assert.AreEqual(2, transactionHandler.ActiveThreadCount); + } + finally + { + gate.Set(); + transactionHandler.Exit(); + } + } + private static SubmitOrderRequest MakeAsyncMarketRequest(Security security, DateTime date) { return new SubmitOrderRequest(OrderType.Market, security.Type, security.Symbol, 1, 0, 0, 0, 0, false, date, "", @@ -2955,7 +3072,7 @@ private class TestableConcurrentBrokerageTransactionHandler : BrokerageTransacti public ConcurrentDictionary RequestProcessingThreads = new(); - // blocks workers to force a sustained backlog so the pool grows + // blocks threads so requests pile up and force the pool to grow public ManualResetEventSlim Gate; public int ActiveThreadCount => ProcessingThreadsCount; From 3cc97bfcd1b215907e5e6aee4da33c92887bcc06 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Fri, 19 Jun 2026 13:31:18 -0500 Subject: [PATCH 5/8] Extract order request processing into a dedicated pool --- .../BacktestingTransactionHandler.cs | 29 +- .../BrokerageTransactionHandler.cs | 223 +++++-------- .../OrderRequestProcessingPool.cs | 312 ++++++++++++++++++ .../BrokerageTransactionHandlerTests.cs | 8 +- 4 files changed, 397 insertions(+), 175 deletions(-) create mode 100644 Engine/TransactionHandlers/OrderRequestProcessingPool.cs diff --git a/Engine/TransactionHandlers/BacktestingTransactionHandler.cs b/Engine/TransactionHandlers/BacktestingTransactionHandler.cs index 8d26ff193315..c1ef72bb43c8 100644 --- a/Engine/TransactionHandlers/BacktestingTransactionHandler.cs +++ b/Engine/TransactionHandlers/BacktestingTransactionHandler.cs @@ -58,14 +58,14 @@ public override void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IRes _enableConcurrency = _brokerage.ConcurrencyEnabled && _algorithm.LiveMode; base.Initialize(algorithm, brokerage, resultHandler); - - if (!_enableConcurrency) - { - // non blocking implementation - _orderRequestQueues = new() { new BusyCollection() }; - } } + /// + /// For backtesting order requests are processed synchronously by the algorithm thread, only live + /// paper deployments with a concurrency enabled brokerage use background transaction threads + /// + protected override bool SynchronousProcessing => !_enableConcurrency; + /// /// Processes all synchronous events that must take place before the next time loop for the algorithm /// @@ -74,7 +74,7 @@ public override void ProcessSynchronousEvents() if (!_enableConcurrency) { // we process pending order requests our selves - Run(0); + ProcessPendingRequests(); } base.ProcessSynchronousEvents(); @@ -113,7 +113,7 @@ protected override void WaitForOrderSubmission(OrderTicket ticket) } // we submit the order request our selves - Run(0); + ProcessPendingRequests(); if (!ticket.OrderSet.WaitOne(0)) { @@ -124,18 +124,5 @@ protected override void WaitForOrderSubmission(OrderTicket ticket) "See the OrderRequest.Response for more information"); } } - - /// - /// For backtesting order requests will be processed by the algorithm thread - /// sequentially at and - /// - protected override void InitializeTransactionThread() - { - if (_enableConcurrency) - { - // let the base class handle this - base.InitializeTransactionThread(); - } - } } } diff --git a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs index f74936d4f792..0699098fe54b 100644 --- a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs +++ b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs @@ -70,20 +70,10 @@ public class BrokerageTransactionHandler : ITransactionHandler private int _failedCashSyncAttempts; /// - /// OrderQueue holds the newly updated orders from the user algorithm waiting to be processed. Once - /// orders are processed they are moved into the Orders queue awaiting the brokerage response. + /// Holds the worker threads and their queues, dispatching each order request to the queue pinned to + /// its order and growing the pool on demand as the threads get saturated. /// - protected List> _orderRequestQueues { get; set; } - - private List _processingThreads; - // maximum number of transaction threads (and queues) the pool can grow to on demand - private int _maximumTransactionThreads; - // pins each order (or combo group) to one queue for its whole life, so all its requests are handled - // in order by the same thread even after the pool grows and changes the modulo used for new orders - private readonly Dictionary _orderRequestQueueIndexByKey = new(); - // guards on demand growth of the queues/threads against concurrent reads in Run/Exit/enqueue - private readonly object _processingThreadsLock = new object(); - private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + protected OrderRequestProcessingPool _threadPool; private readonly ConcurrentQueue _orderEvents = new ConcurrentQueue(); @@ -217,8 +207,6 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu HandleOrderUpdated(e); }; - IsActive = true; - if (_algorithm is QCAlgorithm qcAlgorithm) { _qcAlgorithmInstance = qcAlgorithm; @@ -237,31 +225,36 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu InitializeTransactionThread(); } + /// + /// Whether the transaction thread pool can grow on demand to process order requests concurrently. + /// When false a single worker thread is used. + /// + protected virtual bool ConcurrencyEnabled => _brokerage.ConcurrencyEnabled; + + /// + /// Whether order requests are drained synchronously by the algorithm thread instead of by background + /// worker threads. Used by backtesting deployments. + /// + protected virtual bool SynchronousProcessing => false; + /// /// Create and start the transaction thread, who will be in charge of processing /// the order requests /// protected virtual void InitializeTransactionThread() { - // live deployments start with the minimum number of threads and grow on demand (see TryExpandProcessingThreads) - // up to the maximum. No concurrency means a single thread, no growth. - int initialThreadsCount; - if (_brokerage.ConcurrencyEnabled) - { - _maximumTransactionThreads = Math.Max(1, MaximumTransactionThreads); - initialThreadsCount = Math.Min(Math.Max(1, MinimumTransactionThreads), _maximumTransactionThreads); - } - else + Action processRequest = request => { - _maximumTransactionThreads = initialThreadsCount = 1; - } + HandleOrderRequest(request); + ProcessAsynchronousEvents(); + }; + Action onError = error => _algorithm.SetRuntimeError(error, "HandleOrderRequest"); - _orderRequestQueues = new(_maximumTransactionThreads); - _processingThreads = new(_maximumTransactionThreads); - for (var i = 0; i < initialThreadsCount; i++) - { - AddProcessingThread(); - } + // backtesting drains a single queue synchronously on the algorithm thread, live deployments use + // background worker threads: a single one, or growing on demand up to the maximum when concurrent. + _threadPool = SynchronousProcessing + ? OrderRequestProcessingPool.Synchronous(processRequest, onError) + : new OrderRequestProcessingPool(ConcurrencyEnabled, MinimumTransactionThreads, MaximumTransactionThreads, processRequest, onError); } /// @@ -277,59 +270,13 @@ protected virtual void InitializeTransactionThread() /// /// The number of transaction threads currently running /// - protected int ProcessingThreadsCount - { - get - { - lock (_processingThreadsLock) - { - return _processingThreads?.Count ?? 0; - } - } - } - - /// - /// Creates a queue and its dedicated thread and starts it. - /// Callers growing the pool on demand must hold . - /// - private void AddProcessingThread() - { - var threadId = _orderRequestQueues.Count; // matches the queue index this thread will consume - _orderRequestQueues.Add(new BusyBlockingCollection()); - var thread = new Thread(() => Run(threadId)) { IsBackground = true, Name = $"Transaction Thread {threadId}" }; - _processingThreads.Add(thread); - thread.Start(); - } + protected int ProcessingThreadsCount => _threadPool?.ThreadCount ?? 0; /// - /// Grows the pool only when every thread is busy and still has pending requests, up to the maximum. - /// Caller must hold . + /// Boolean flag indicating the transaction threads are busy. + /// False indicates they are completely finished processing and ready to be terminated. /// - private void TryExpandProcessingThreads() - { - if (_orderRequestQueues.Count >= _maximumTransactionThreads || _cancellationTokenSource.IsCancellationRequested) - { - return; - } - - // only grow when the whole pool is saturated: every thread busy and with requests still waiting - for (var i = 0; i < _orderRequestQueues.Count; i++) - { - var queue = _orderRequestQueues[i]; - if (!queue.IsBusy || queue.Count == 0) - { - return; - } - } - - AddProcessingThread(); - } - - /// - /// Boolean flag indicating the Run thread method is busy. - /// False indicates it is completely finished processing and ready to be terminated. - /// - public bool IsActive { get; private set; } + public bool IsActive => _threadPool?.IsActive ?? false; #region Order Request Processing @@ -437,7 +384,7 @@ public OrderTicket AddOrder(SubmitOrderRequest request) } /// - /// Wait for the order to be handled by the + /// Wait for the order to be handled by the /// /// The expecting to be submitted protected virtual void WaitForOrderSubmission(OrderTicket ticket) @@ -745,36 +692,12 @@ public List GetOpenOrders(Func filter = null) } /// - /// Primary thread entry point to launch the transaction thread. + /// Drains the pending order requests on the calling thread. Used by synchronous (non concurrent) + /// deployments, where the algorithm thread pumps the request queue itself. /// - protected void Run(int threadId) + protected void ProcessPendingRequests() { - IBusyCollection queue; - lock (_processingThreadsLock) - { - // capture our queue safely, the queues list may be growing on demand concurrently - queue = _orderRequestQueues[threadId]; - } - - try - { - foreach (var request in queue.GetConsumingEnumerable(_cancellationTokenSource.Token)) - { - HandleOrderRequest(request); - ProcessAsynchronousEvents(); - } - } - catch (Exception err) - { - // unexpected error, we need to close down shop - _algorithm.SetRuntimeError(err, "HandleOrderRequest"); - } - - if (_processingThreads != null) - { - Log.Trace($"BrokerageTransactionHandler.Run(): Ending Thread {threadId}..."); - IsActive = false; - } + _threadPool.ProcessPending(); } /// @@ -795,7 +718,7 @@ public virtual void ProcessSynchronousEvents() // in backtesting we need to wait for orders to be removed from the queue and finished processing if (!_algorithm.LiveMode) { - if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token))) + if (_threadPool.WaitForProcessing(Time.OneSecond)) { Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing."); } @@ -878,35 +801,16 @@ public void AddOpenOrder(Order order, IAlgorithm algorithm) public void Exit() { var timeout = TimeSpan.FromSeconds(60); - if (_processingThreads != null) + if (_threadPool != null) { - // snapshot under the lock since the pool might still be growing on demand concurrently - List> queues; - List threads; - lock (_processingThreadsLock) - { - queues = _orderRequestQueues.ToList(); - threads = _processingThreads.ToList(); - } - - // only wait if the processing thread is running - if (queues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(timeout))) + // only wait if a queue is still processing + if (_threadPool.WaitForProcessing(timeout)) { Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds."); } - foreach (var queue in queues) - { - queue.CompleteAdding(); - } - - foreach (var thread in threads) - { - thread?.StopSafely(timeout, _cancellationTokenSource); - } + _threadPool.Shutdown(timeout); } - IsActive = false; - _cancellationTokenSource.DisposeSafely(); } /// @@ -1314,6 +1218,13 @@ private void HandleOrderEvents(List orderEvents) order.Status = orderEvent.Status; } + // once an order reaches a final state it won't receive any more requests, so release its pinned + // processing queue to keep the pin map bounded to the orders still in flight + if (order.Status.IsClosed()) + { + TryReleaseProcessingQueue(order); + } + orderEvent.Id = order.GetNewId(); // set the modified time of the order to the fill's timestamp @@ -2027,29 +1938,43 @@ private string GetShortableErrorMessage(Symbol symbol, decimal quantity) private void EnqueueOrderRequest(OrderRequest request, Order order) { // route by OrderId (or combo group id) so requests for the same order keep their order on one queue - var queueKey = request.OrderId; + var routingKey = request.OrderId; if (order.GroupOrderManager?.Id > 0) { - queueKey = order.GroupOrderManager.Id; + routingKey = order.GroupOrderManager.Id; } - IBusyCollection queue; - lock (_processingThreadsLock) + _threadPool.Dispatch(request, routingKey); + } + + /// + /// Releases the processing queue pinned to a closed order so the pin map stays bounded to the orders still + /// in flight. A combo group shares a single queue keyed by its group id, so it is only released once every + /// leg has reached a final state, mirroring the routing key used in . + /// + private void TryReleaseProcessingQueue(Order order) + { + var group = order.GroupOrderManager; + if (group == null || group.Id <= 0) { - // grow the pool first if every existing thread is already saturated - TryExpandProcessingThreads(); + _threadPool.Release(order.Id); + return; + } - // reuse the order's pinned queue if it has one, so it is never re-routed when the pool grows - if (!_orderRequestQueueIndexByKey.TryGetValue(queueKey, out var queueIndex)) + // the whole group routes through one queue; its still-open legs must keep landing on that same queue, + // so we can only release it once every leg has been submitted and reached a final state + if (group.OrderIds.Count < group.Count) + { + return; + } + foreach (var legId in group.OrderIds) + { + if (!_completeOrders.TryGetValue(legId, out var leg) || !leg.Status.IsClosed()) { - queueIndex = queueKey % _orderRequestQueues.Count; - _orderRequestQueueIndexByKey[queueKey] = queueIndex; + return; } - queue = _orderRequestQueues[queueIndex]; } - - // add outside the lock, since it can block when the queue is at its bounded capacity - queue.Add(request); + _threadPool.Release(group.Id); } /// diff --git a/Engine/TransactionHandlers/OrderRequestProcessingPool.cs b/Engine/TransactionHandlers/OrderRequestProcessingPool.cs new file mode 100644 index 000000000000..95c12b5739ef --- /dev/null +++ b/Engine/TransactionHandlers/OrderRequestProcessingPool.cs @@ -0,0 +1,312 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using QuantConnect.Interfaces; +using QuantConnect.Logging; +using QuantConnect.Orders; +using QuantConnect.Util; + +namespace QuantConnect.Lean.Engine.TransactionHandlers +{ + /// + /// Holds the worker threads and their queues used to process order requests, dispatching each + /// request to the queue pinned to its order and growing the pool on demand when it gets saturated. + /// + /// + /// In concurrent mode each thread owns a single it consumes, + /// the pool starts at the minimum number of threads and grows up to the maximum when every thread is + /// busy with pending work. In synchronous mode there are no worker threads: a single non blocking queue + /// is drained on the caller thread via . + /// + public class OrderRequestProcessingPool + { + // one queue per worker thread; the newly updated order requests wait here to be processed + private readonly List> _queues; + private readonly List _threads; + // pins each order (or combo group) to one queue for its whole life, so all its requests are handled + // in order by the same thread even after the pool grows and re-routes new orders to other queues + private readonly Dictionary _queueIndexByKey = new(); + // guards on demand growth of the queues/threads against concurrent reads in Run/Dispatch/Shutdown + private readonly object _lock = new object(); + // maximum number of threads (and queues) the pool can grow to on demand + private readonly int _maximumThreads; + // true when there are no worker threads and the caller drains the single queue itself + private readonly bool _synchronous; + private readonly Action _processRequest; + private readonly Action _onError; + private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + + /// + /// True while the pool is processing order requests, false once its worker threads have finished. + /// + public bool IsActive { get; private set; } + + /// + /// The number of worker threads currently running. + /// + public int ThreadCount + { + get + { + lock (_lock) + { + return _threads.Count; + } + } + } + + /// + /// Creates a threaded pool and starts its initial worker threads. When concurrency is enabled the pool + /// starts at and grows on demand up to , + /// otherwise it runs a single fixed worker thread. + /// + /// True to grow the pool on demand, false to run a single worker thread + /// The number of worker threads the pool starts with when growing + /// The maximum number of worker threads the pool can grow to on demand + /// Handles a single order request + /// Invoked when processing fails unexpectedly + public OrderRequestProcessingPool(bool concurrencyEnabled, int minimumThreads, int maximumThreads, + Action processRequest, Action onError) + { + _synchronous = false; + _processRequest = processRequest; + _onError = onError; + // concurrency grows the pool minimum..maximum on demand, otherwise a single fixed thread is used + _maximumThreads = concurrencyEnabled ? Math.Max(1, maximumThreads) : 1; + var initialThreadsCount = concurrencyEnabled ? Math.Min(Math.Max(1, minimumThreads), _maximumThreads) : 1; + + _queues = new(_maximumThreads); + _threads = new(_maximumThreads); + IsActive = true; + for (var i = 0; i < initialThreadsCount; i++) + { + AddThread(); + } + } + + /// + /// Private constructor for the synchronous pool: a single non blocking queue and no worker threads. + /// + private OrderRequestProcessingPool(Action processRequest, Action onError) + { + _synchronous = true; + _processRequest = processRequest; + _onError = onError; + _maximumThreads = 1; + + _queues = new(1) { new BusyCollection() }; + _threads = new(0); + IsActive = true; + } + + /// + /// Creates a synchronous pool with no worker threads: its single queue is drained on the caller thread + /// via . + /// + /// Handles a single order request + /// Invoked when processing fails unexpectedly + public static OrderRequestProcessingPool Synchronous(Action processRequest, Action onError) + { + return new OrderRequestProcessingPool(processRequest, onError); + } + + /// + /// Dispatches an order request to the queue pinned to its routing key, growing the pool first if + /// every existing thread is already saturated. + /// + /// The order request to process + /// Identifies the order (or combo group) the request belongs to + public void Dispatch(OrderRequest request, int routingKey) + { + IBusyCollection queue; + lock (_lock) + { + // grow the pool first if every existing thread is already saturated + TryExpand(); + + // reuse the order's pinned queue if it has one, so it is never re-routed when the pool grows + if (!_queueIndexByKey.TryGetValue(routingKey, out var queueIndex)) + { + queueIndex = routingKey % _queues.Count; + _queueIndexByKey[routingKey] = queueIndex; + } + queue = _queues[queueIndex]; + } + + // add outside the lock, since it can block when the queue is at its bounded capacity + queue.Add(request); + } + + /// + /// Releases the queue pinned to the given routing key once its order reaches a final state, keeping the + /// pin map bounded to the orders still in flight. + /// + /// The routing key previously used in + public void Release(int routingKey) + { + lock (_lock) + { + _queueIndexByKey.Remove(routingKey); + } + } + + /// + /// Drains the pending order requests on the calling thread. Only used in synchronous mode, where there + /// are no worker threads and the caller pumps the single queue itself. + /// + public void ProcessPending() + { + try + { + Consume(_queues[0]); + } + catch (Exception err) + { + // unexpected error, we need to close down shop + _onError(err); + } + } + + /// + /// Waits for every queue to finish processing its pending requests, up to the given timeout. + /// + /// The maximum time to wait + /// True if any queue was still busy when the timeout elapsed + public bool WaitForProcessing(TimeSpan timeout) + { + // synchronous mode has no worker thread to drain the queue, the caller pumps it via ProcessPending + if (_synchronous) + { + return false; + } + + List> queues; + lock (_lock) + { + // snapshot under the lock since the queues list may be growing on demand concurrently + queues = _queues.ToList(); + } + + return queues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(timeout, _cancellationTokenSource.Token)); + } + + /// + /// Stops every worker thread and waits for them to terminate, up to the given timeout. + /// + /// The maximum time to wait for each thread to stop + public void Shutdown(TimeSpan timeout) + { + List> queues; + List threads; + lock (_lock) + { + // snapshot under the lock since the pool might still be growing on demand concurrently + queues = _queues.ToList(); + threads = _threads.ToList(); + } + + foreach (var queue in queues) + { + queue.CompleteAdding(); + } + + foreach (var thread in threads) + { + thread?.StopSafely(timeout, _cancellationTokenSource); + } + + IsActive = false; + _cancellationTokenSource.DisposeSafely(); + } + + /// + /// Creates a queue and its dedicated worker thread and starts it. + /// Callers growing the pool on demand must hold . + /// + private void AddThread() + { + var threadId = _queues.Count; // matches the queue index this thread will consume + _queues.Add(new BusyBlockingCollection()); + var thread = new Thread(() => Run(threadId)) { IsBackground = true, Name = $"Transaction Thread {threadId}" }; + _threads.Add(thread); + thread.Start(); + } + + /// + /// Grows the pool only when every thread is busy and still has pending requests, up to the maximum. + /// Caller must hold . + /// + private void TryExpand() + { + if (_synchronous || _queues.Count >= _maximumThreads || _cancellationTokenSource.IsCancellationRequested) + { + return; + } + + // only grow when the whole pool is saturated: every thread busy and with requests still waiting + for (var i = 0; i < _queues.Count; i++) + { + var queue = _queues[i]; + if (!queue.IsBusy || queue.Count == 0) + { + return; + } + } + + AddThread(); + } + + /// + /// Worker thread entry point: consumes its queue until the pool is shut down. + /// + private void Run(int threadId) + { + IBusyCollection queue; + lock (_lock) + { + // capture our queue safely, the queues list may be growing on demand concurrently + queue = _queues[threadId]; + } + + try + { + Consume(queue); + } + catch (Exception err) + { + // unexpected error, we need to close down shop + _onError(err); + } + + Log.Trace($"OrderRequestProcessingPool.Run(): Ending Thread {threadId}..."); + IsActive = false; + } + + /// + /// Processes every request the queue yields, handing each one to the configured processor. + /// + private void Consume(IBusyCollection queue) + { + foreach (var request in queue.GetConsumingEnumerable(_cancellationTokenSource.Token)) + { + _processRequest(request); + } + } + } +} diff --git a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs index 4e008c19b9b5..7a754ef47dfb 100644 --- a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs +++ b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs @@ -2962,6 +2962,9 @@ public class TestBrokerageTransactionHandler : BrokerageTransactionHandler protected override TimeSpan TimeSinceLastFill => TestTimeSinceLastFill; + // no worker thread: these tests drive HandleOrderRequest manually + protected override bool SynchronousProcessing => true; + public override void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResultHandler resultHandler) { _brokerage = brokerage; @@ -2974,11 +2977,6 @@ public DateTime GetLastSyncDate() return _brokerage.LastSyncDateTimeUtc.ConvertFromUtc(TimeZones.NewYork); } - protected override void InitializeTransactionThread() - { - _orderRequestQueues = new() { new BusyCollection() }; - } - public new void RoundOrderPrices(Order order, Security security) { base.RoundOrderPrices(order, security); From af2d302a3e09908cd7dbe780b71edd93f37db015 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Mon, 22 Jun 2026 11:32:08 -0500 Subject: [PATCH 6/8] Move request routing and release into the processing pool --- .../BrokerageTransactionHandler.cs | 52 ++----------------- .../OrderRequestProcessingPool.cs | 52 +++++++++++++++---- 2 files changed, 48 insertions(+), 56 deletions(-) diff --git a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs index 0699098fe54b..274d28120ba2 100644 --- a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs +++ b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs @@ -356,7 +356,7 @@ public OrderTicket AddOrder(SubmitOrderRequest request) order.OrderSubmissionData = new OrderSubmissionData(security.BidPrice, security.AskPrice, security.Close); _openOrders[order.Id] = new OpenOrderState(order, ticket, security); - EnqueueOrderRequest(request, order); + _threadPool.Dispatch(request, order); WaitForOrderSubmission(ticket); } @@ -472,7 +472,7 @@ public OrderTicket UpdateOrder(UpdateOrderRequest request) else { request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing); - EnqueueOrderRequest(request, order); + _threadPool.Dispatch(request, order); } } catch (Exception err) @@ -544,7 +544,7 @@ public OrderTicket CancelOrder(CancelOrderRequest request) // send the request to be processed request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing); - EnqueueOrderRequest(request, order); + _threadPool.Dispatch(request, order); } } catch (Exception err) @@ -1218,11 +1218,10 @@ private void HandleOrderEvents(List orderEvents) order.Status = orderEvent.Status; } - // once an order reaches a final state it won't receive any more requests, so release its pinned - // processing queue to keep the pin map bounded to the orders still in flight + // notify the pool once an order reaches a final state so it can release its processing queue if (order.Status.IsClosed()) { - TryReleaseProcessingQueue(order); + _threadPool.Release(order); } orderEvent.Id = order.GetNewId(); @@ -1935,47 +1934,6 @@ private string GetShortableErrorMessage(Symbol symbol, decimal quantity) return $"Order exceeds shortable quantity {shortableQuantity} for Symbol {symbol} requested {quantity})"; } - private void EnqueueOrderRequest(OrderRequest request, Order order) - { - // route by OrderId (or combo group id) so requests for the same order keep their order on one queue - var routingKey = request.OrderId; - if (order.GroupOrderManager?.Id > 0) - { - routingKey = order.GroupOrderManager.Id; - } - - _threadPool.Dispatch(request, routingKey); - } - - /// - /// Releases the processing queue pinned to a closed order so the pin map stays bounded to the orders still - /// in flight. A combo group shares a single queue keyed by its group id, so it is only released once every - /// leg has reached a final state, mirroring the routing key used in . - /// - private void TryReleaseProcessingQueue(Order order) - { - var group = order.GroupOrderManager; - if (group == null || group.Id <= 0) - { - _threadPool.Release(order.Id); - return; - } - - // the whole group routes through one queue; its still-open legs must keep landing on that same queue, - // so we can only release it once every leg has been submitted and reached a final state - if (group.OrderIds.Count < group.Count) - { - return; - } - foreach (var legId in group.OrderIds) - { - if (!_completeOrders.TryGetValue(legId, out var leg) || !leg.Status.IsClosed()) - { - return; - } - } - _threadPool.Release(group.Id); - } /// /// Holds an order and its state diff --git a/Engine/TransactionHandlers/OrderRequestProcessingPool.cs b/Engine/TransactionHandlers/OrderRequestProcessingPool.cs index 95c12b5739ef..efc31a77eada 100644 --- a/Engine/TransactionHandlers/OrderRequestProcessingPool.cs +++ b/Engine/TransactionHandlers/OrderRequestProcessingPool.cs @@ -42,6 +42,8 @@ public class OrderRequestProcessingPool // pins each order (or combo group) to one queue for its whole life, so all its requests are handled // in order by the same thread even after the pool grows and re-routes new orders to other queues private readonly Dictionary _queueIndexByKey = new(); + // tracks the completed legs of each combo group, so its pinned queue is only released once they are all done + private readonly Dictionary> _completedComboLegs = new(); // guards on demand growth of the queues/threads against concurrent reads in Run/Dispatch/Shutdown private readonly object _lock = new object(); // maximum number of threads (and queues) the pool can grow to on demand @@ -127,13 +129,16 @@ public static OrderRequestProcessingPool Synchronous(Action proces } /// - /// Dispatches an order request to the queue pinned to its routing key, growing the pool first if - /// every existing thread is already saturated. + /// Dispatches an order request to the queue pinned to its order, growing the pool first if every existing + /// thread is already saturated. All the requests of an order, and of every leg of a combo group, are routed + /// to the same queue so they are processed in order by a single thread. /// /// The order request to process - /// Identifies the order (or combo group) the request belongs to - public void Dispatch(OrderRequest request, int routingKey) + /// The order the request belongs to, used to decide its routing + public void Dispatch(OrderRequest request, Order order) { + var routingKey = GetRoutingKey(order); + IBusyCollection queue; lock (_lock) { @@ -154,18 +159,47 @@ public void Dispatch(OrderRequest request, int routingKey) } /// - /// Releases the queue pinned to the given routing key once its order reaches a final state, keeping the - /// pin map bounded to the orders still in flight. + /// Releases the queue pinned to an order once it reaches a final state, keeping the pin map bounded to the + /// orders still in flight. A combo group shares a single queue, so it is only released once every leg of + /// the group has completed. /// - /// The routing key previously used in - public void Release(int routingKey) + /// The order that reached a final state + public void Release(Order order) { + var group = order.GroupOrderManager; lock (_lock) { - _queueIndexByKey.Remove(routingKey); + if (group == null || group.Id <= 0) + { + _queueIndexByKey.Remove(order.Id); + return; + } + + // the whole combo routes through one queue keyed by the group id, so we track its completed legs + // and only release the queue once every leg of the group has reached a final state + if (!_completedComboLegs.TryGetValue(group.Id, out var completedLegs)) + { + completedLegs = new HashSet(); + _completedComboLegs[group.Id] = completedLegs; + } + completedLegs.Add(order.Id); + if (completedLegs.Count >= group.Count) + { + _completedComboLegs.Remove(group.Id); + _queueIndexByKey.Remove(group.Id); + } } } + /// + /// Computes the routing key of an order: the combo group id when it belongs to one, otherwise its own id, + /// so that every leg of a combo is routed to the same queue. + /// + private static int GetRoutingKey(Order order) + { + return order.GroupOrderManager?.Id > 0 ? order.GroupOrderManager.Id : order.Id; + } + /// /// Drains the pending order requests on the calling thread. Only used in synchronous mode, where there /// are no worker threads and the caller pumps the single queue itself. From 36fdfb9ccbc1dde60aedee6c767dd81f1fd2ebb9 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Mon, 22 Jun 2026 12:34:35 -0500 Subject: [PATCH 7/8] Address review comments --- .../BacktestingTransactionHandler.cs | 2 +- .../BrokerageTransactionHandler.cs | 53 ++++++++----------- .../OrderRequestProcessingPool.cs | 30 ++++++++--- 3 files changed, 46 insertions(+), 39 deletions(-) diff --git a/Engine/TransactionHandlers/BacktestingTransactionHandler.cs b/Engine/TransactionHandlers/BacktestingTransactionHandler.cs index c1ef72bb43c8..c554fd926de1 100644 --- a/Engine/TransactionHandlers/BacktestingTransactionHandler.cs +++ b/Engine/TransactionHandlers/BacktestingTransactionHandler.cs @@ -62,7 +62,7 @@ public override void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IRes /// /// For backtesting order requests are processed synchronously by the algorithm thread, only live - /// paper deployments with a concurrency enabled brokerage use background transaction threads + /// deployments with a concurrency enabled brokerage use background transaction threads /// protected override bool SynchronousProcessing => !_enableConcurrency; diff --git a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs index 274d28120ba2..50238bbd7be6 100644 --- a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs +++ b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs @@ -237,26 +237,6 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu /// protected virtual bool SynchronousProcessing => false; - /// - /// Create and start the transaction thread, who will be in charge of processing - /// the order requests - /// - protected virtual void InitializeTransactionThread() - { - Action processRequest = request => - { - HandleOrderRequest(request); - ProcessAsynchronousEvents(); - }; - Action onError = error => _algorithm.SetRuntimeError(error, "HandleOrderRequest"); - - // backtesting drains a single queue synchronously on the algorithm thread, live deployments use - // background worker threads: a single one, or growing on demand up to the maximum when concurrent. - _threadPool = SynchronousProcessing - ? OrderRequestProcessingPool.Synchronous(processRequest, onError) - : new OrderRequestProcessingPool(ConcurrencyEnabled, MinimumTransactionThreads, MaximumTransactionThreads, processRequest, onError); - } - /// /// The maximum number of transaction threads the pool can grow to /// @@ -278,6 +258,26 @@ protected virtual void InitializeTransactionThread() /// public bool IsActive => _threadPool?.IsActive ?? false; + /// + /// Create and start the transaction thread, who will be in charge of processing + /// the order requests + /// + protected virtual void InitializeTransactionThread() + { + Action processRequest = request => + { + HandleOrderRequest(request); + ProcessAsynchronousEvents(); + }; + Action onError = error => _algorithm.SetRuntimeError(error, "HandleOrderRequest"); + + // backtesting drains a single queue synchronously on the algorithm thread, live deployments use + // background worker threads: a single one, or growing on demand up to the maximum when concurrent. + _threadPool = SynchronousProcessing + ? OrderRequestProcessingPool.Synchronous(processRequest, onError) + : new OrderRequestProcessingPool(ConcurrencyEnabled, MinimumTransactionThreads, MaximumTransactionThreads, processRequest, onError); + } + #region Order Request Processing /// @@ -800,17 +800,8 @@ public void AddOpenOrder(Order order, IAlgorithm algorithm) /// public void Exit() { - var timeout = TimeSpan.FromSeconds(60); - if (_threadPool != null) - { - // only wait if a queue is still processing - if (_threadPool.WaitForProcessing(timeout)) - { - Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds."); - } - - _threadPool.Shutdown(timeout); - } + // Shutdown drains the queued requests (CompleteAdding) and waits for the threads before stopping + _threadPool?.Shutdown(TimeSpan.FromSeconds(60)); } /// diff --git a/Engine/TransactionHandlers/OrderRequestProcessingPool.cs b/Engine/TransactionHandlers/OrderRequestProcessingPool.cs index efc31a77eada..69d52ed99052 100644 --- a/Engine/TransactionHandlers/OrderRequestProcessingPool.cs +++ b/Engine/TransactionHandlers/OrderRequestProcessingPool.cs @@ -50,6 +50,8 @@ public class OrderRequestProcessingPool private readonly int _maximumThreads; // true when there are no worker threads and the caller drains the single queue itself private readonly bool _synchronous; + // set under the lock while shutting down so the pool stops growing + private bool _shuttingDown; private readonly Action _processRequest; private readonly Action _onError; private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); @@ -230,14 +232,27 @@ public bool WaitForProcessing(TimeSpan timeout) return false; } - List> queues; - lock (_lock) + // re-check each pass so a queue added while we waited is not missed + IBusyCollection busyQueue; + while ((busyQueue = GetBusyQueue()) != null) { - // snapshot under the lock since the queues list may be growing on demand concurrently - queues = _queues.ToList(); + if (!busyQueue.WaitHandle.WaitOne(timeout, _cancellationTokenSource.Token)) + { + return true; + } } + return false; + } - return queues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(timeout, _cancellationTokenSource.Token)); + /// + /// Returns a queue still processing requests, or null if every queue is idle. + /// + private IBusyCollection GetBusyQueue() + { + lock (_lock) + { + return _queues.FirstOrDefault(queue => queue.IsBusy); + } } /// @@ -250,7 +265,8 @@ public void Shutdown(TimeSpan timeout) List threads; lock (_lock) { - // snapshot under the lock since the pool might still be growing on demand concurrently + // stop growing so the snapshot below can't miss a queue/thread added afterwards + _shuttingDown = true; queues = _queues.ToList(); threads = _threads.ToList(); } @@ -288,7 +304,7 @@ private void AddThread() /// private void TryExpand() { - if (_synchronous || _queues.Count >= _maximumThreads || _cancellationTokenSource.IsCancellationRequested) + if (_synchronous || _shuttingDown || _queues.Count >= _maximumThreads || _cancellationTokenSource.IsCancellationRequested) { return; } From 1ada259f78438029de92377cf88a6643a160cd48 Mon Sep 17 00:00:00 2001 From: Josue Nina Date: Mon, 22 Jun 2026 16:39:27 -0500 Subject: [PATCH 8/8] Simplify processing pool shutdown and queue lookup --- .../BrokerageTransactionHandler.cs | 1 - .../OrderRequestProcessingPool.cs | 28 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs index 50238bbd7be6..e8a20b56b258 100644 --- a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs +++ b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs @@ -1925,7 +1925,6 @@ private string GetShortableErrorMessage(Symbol symbol, decimal quantity) return $"Order exceeds shortable quantity {shortableQuantity} for Symbol {symbol} requested {quantity})"; } - /// /// Holds an order and its state /// diff --git a/Engine/TransactionHandlers/OrderRequestProcessingPool.cs b/Engine/TransactionHandlers/OrderRequestProcessingPool.cs index 69d52ed99052..5ecf02c211c4 100644 --- a/Engine/TransactionHandlers/OrderRequestProcessingPool.cs +++ b/Engine/TransactionHandlers/OrderRequestProcessingPool.cs @@ -44,7 +44,8 @@ public class OrderRequestProcessingPool private readonly Dictionary _queueIndexByKey = new(); // tracks the completed legs of each combo group, so its pinned queue is only released once they are all done private readonly Dictionary> _completedComboLegs = new(); - // guards on demand growth of the queues/threads against concurrent reads in Run/Dispatch/Shutdown + // guards the queues/threads and pin maps against the on demand growth happening concurrently with + // the worker threads, dispatching, releasing and shutdown private readonly object _lock = new object(); // maximum number of threads (and queues) the pool can grow to on demand private readonly int _maximumThreads; @@ -233,10 +234,9 @@ public bool WaitForProcessing(TimeSpan timeout) } // re-check each pass so a queue added while we waited is not missed - IBusyCollection busyQueue; - while ((busyQueue = GetBusyQueue()) != null) + while (TryGetBusyQueue(out var queue)) { - if (!busyQueue.WaitHandle.WaitOne(timeout, _cancellationTokenSource.Token)) + if (!queue.WaitHandle.WaitOne(timeout, _cancellationTokenSource.Token)) { return true; } @@ -245,13 +245,16 @@ public bool WaitForProcessing(TimeSpan timeout) } /// - /// Returns a queue still processing requests, or null if every queue is idle. + /// Gets a queue still processing requests, if any. /// - private IBusyCollection GetBusyQueue() + /// The busy queue found, or null if every queue is idle + /// True if a busy queue was found, false if every queue is idle + private bool TryGetBusyQueue(out IBusyCollection queue) { lock (_lock) { - return _queues.FirstOrDefault(queue => queue.IsBusy); + queue = _queues.FirstOrDefault(q => q.IsBusy); + return queue != null; } } @@ -261,22 +264,19 @@ private IBusyCollection GetBusyQueue() /// The maximum time to wait for each thread to stop public void Shutdown(TimeSpan timeout) { - List> queues; - List threads; lock (_lock) { - // stop growing so the snapshot below can't miss a queue/thread added afterwards + // stop growing so no queue/thread can be added while we shut down, which leaves the + // collections frozen and safe to iterate without taking a snapshot _shuttingDown = true; - queues = _queues.ToList(); - threads = _threads.ToList(); } - foreach (var queue in queues) + foreach (var queue in _queues) { queue.CompleteAdding(); } - foreach (var thread in threads) + foreach (var thread in _threads) { thread?.StopSafely(timeout, _cancellationTokenSource); }