diff --git a/Engine/TransactionHandlers/BacktestingTransactionHandler.cs b/Engine/TransactionHandlers/BacktestingTransactionHandler.cs index 8d26ff193315..c554fd926de1 100644 --- a/Engine/TransactionHandlers/BacktestingTransactionHandler.cs +++ b/Engine/TransactionHandlers/BacktestingTransactionHandler.cs @@ -58,14 +58,14 @@ public override void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IRes _enableConcurrency = _brokerage.ConcurrencyEnabled && _algorithm.LiveMode; base.Initialize(algorithm, brokerage, resultHandler); - - if (!_enableConcurrency) - { - // non blocking implementation - _orderRequestQueues = new() { new BusyCollection() }; - } } + /// + /// For backtesting order requests are processed synchronously by the algorithm thread, only live + /// deployments with a concurrency enabled brokerage use background transaction threads + /// + protected override bool SynchronousProcessing => !_enableConcurrency; + /// /// Processes all synchronous events that must take place before the next time loop for the algorithm /// @@ -74,7 +74,7 @@ public override void ProcessSynchronousEvents() if (!_enableConcurrency) { // we process pending order requests our selves - Run(0); + ProcessPendingRequests(); } base.ProcessSynchronousEvents(); @@ -113,7 +113,7 @@ protected override void WaitForOrderSubmission(OrderTicket ticket) } // we submit the order request our selves - Run(0); + ProcessPendingRequests(); if (!ticket.OrderSet.WaitOne(0)) { @@ -124,18 +124,5 @@ protected override void WaitForOrderSubmission(OrderTicket ticket) "See the OrderRequest.Response for more information"); } } - - /// - /// For backtesting order requests will be processed by the algorithm thread - /// sequentially at and - /// - protected override void InitializeTransactionThread() - { - if (_enableConcurrency) - { - // let the base class handle this - base.InitializeTransactionThread(); - } - } } } diff --git a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs index 53bf641716c7..e8a20b56b258 100644 --- a/Engine/TransactionHandlers/BrokerageTransactionHandler.cs +++ b/Engine/TransactionHandlers/BrokerageTransactionHandler.cs @@ -70,13 +70,10 @@ public class BrokerageTransactionHandler : ITransactionHandler private int _failedCashSyncAttempts; /// - /// OrderQueue holds the newly updated orders from the user algorithm waiting to be processed. Once - /// orders are processed they are moved into the Orders queue awaiting the brokerage response. + /// Holds the worker threads and their queues, dispatching each order request to the queue pinned to + /// its order and growing the pool on demand as the threads get saturated. /// - protected List> _orderRequestQueues { get; set; } - - private List _processingThreads; - private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + protected OrderRequestProcessingPool _threadPool; private readonly ConcurrentQueue _orderEvents = new ConcurrentQueue(); @@ -210,8 +207,6 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu HandleOrderUpdated(e); }; - IsActive = true; - if (_algorithm is QCAlgorithm qcAlgorithm) { _qcAlgorithmInstance = qcAlgorithm; @@ -230,35 +225,58 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu InitializeTransactionThread(); } + /// + /// Whether the transaction thread pool can grow on demand to process order requests concurrently. + /// When false a single worker thread is used. + /// + protected virtual bool ConcurrencyEnabled => _brokerage.ConcurrencyEnabled; + + /// + /// Whether order requests are drained synchronously by the algorithm thread instead of by background + /// worker threads. Used by backtesting deployments. + /// + protected virtual bool SynchronousProcessing => false; + + /// + /// The maximum number of transaction threads the pool can grow to + /// + protected virtual int MaximumTransactionThreads => Config.GetInt("maximum-transaction-threads", 10); + + /// + /// The number of transaction threads the pool starts with + /// + protected virtual int MinimumTransactionThreads => Config.GetInt("minimum-transaction-threads", 2); + + /// + /// The number of transaction threads currently running + /// + protected int ProcessingThreadsCount => _threadPool?.ThreadCount ?? 0; + + /// + /// Boolean flag indicating the transaction threads are busy. + /// False indicates they are completely finished processing and ready to be terminated. + /// + public bool IsActive => _threadPool?.IsActive ?? false; + /// /// Create and start the transaction thread, who will be in charge of processing /// the order requests /// protected virtual void InitializeTransactionThread() { - // multi threaded queue, used for live deployments - var processingThreadsCount = _brokerage.ConcurrencyEnabled - ? Config.GetInt("maximum-transaction-threads", 4) - : 1; - _orderRequestQueues = new(processingThreadsCount); - _processingThreads = new(processingThreadsCount); - for (var i = 0; i < processingThreadsCount; i++) - { - _orderRequestQueues.Add(new BusyBlockingCollection()); - var threadId = i; // avoid modified closure - _processingThreads.Add(new Thread(() => Run(threadId)) { IsBackground = true, Name = $"Transaction Thread {i}" }); - } - foreach (var thread in _processingThreads) + Action processRequest = request => { - thread.Start(); - } - } + HandleOrderRequest(request); + ProcessAsynchronousEvents(); + }; + Action onError = error => _algorithm.SetRuntimeError(error, "HandleOrderRequest"); - /// - /// Boolean flag indicating the Run thread method is busy. - /// False indicates it is completely finished processing and ready to be terminated. - /// - public bool IsActive { get; private set; } + // backtesting drains a single queue synchronously on the algorithm thread, live deployments use + // background worker threads: a single one, or growing on demand up to the maximum when concurrent. + _threadPool = SynchronousProcessing + ? OrderRequestProcessingPool.Synchronous(processRequest, onError) + : new OrderRequestProcessingPool(ConcurrencyEnabled, MinimumTransactionThreads, MaximumTransactionThreads, processRequest, onError); + } #region Order Request Processing @@ -338,7 +356,7 @@ public OrderTicket AddOrder(SubmitOrderRequest request) order.OrderSubmissionData = new OrderSubmissionData(security.BidPrice, security.AskPrice, security.Close); _openOrders[order.Id] = new OpenOrderState(order, ticket, security); - EnqueueOrderRequest(request, order); + _threadPool.Dispatch(request, order); WaitForOrderSubmission(ticket); } @@ -366,7 +384,7 @@ public OrderTicket AddOrder(SubmitOrderRequest request) } /// - /// Wait for the order to be handled by the + /// Wait for the order to be handled by the /// /// The expecting to be submitted protected virtual void WaitForOrderSubmission(OrderTicket ticket) @@ -454,7 +472,7 @@ public OrderTicket UpdateOrder(UpdateOrderRequest request) else { request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing); - EnqueueOrderRequest(request, order); + _threadPool.Dispatch(request, order); } } catch (Exception err) @@ -526,7 +544,7 @@ public OrderTicket CancelOrder(CancelOrderRequest request) // send the request to be processed request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing); - EnqueueOrderRequest(request, order); + _threadPool.Dispatch(request, order); } } catch (Exception err) @@ -674,29 +692,12 @@ public List GetOpenOrders(Func filter = null) } /// - /// Primary thread entry point to launch the transaction thread. + /// Drains the pending order requests on the calling thread. Used by synchronous (non concurrent) + /// deployments, where the algorithm thread pumps the request queue itself. /// - protected void Run(int threadId) + protected void ProcessPendingRequests() { - try - { - foreach (var request in _orderRequestQueues[threadId].GetConsumingEnumerable(_cancellationTokenSource.Token)) - { - HandleOrderRequest(request); - ProcessAsynchronousEvents(); - } - } - catch (Exception err) - { - // unexpected error, we need to close down shop - _algorithm.SetRuntimeError(err, "HandleOrderRequest"); - } - - if (_processingThreads != null) - { - Log.Trace($"BrokerageTransactionHandler.Run(): Ending Thread {threadId}..."); - IsActive = false; - } + _threadPool.ProcessPending(); } /// @@ -717,7 +718,7 @@ public virtual void ProcessSynchronousEvents() // in backtesting we need to wait for orders to be removed from the queue and finished processing if (!_algorithm.LiveMode) { - if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token))) + if (_threadPool.WaitForProcessing(Time.OneSecond)) { Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing."); } @@ -799,27 +800,8 @@ public void AddOpenOrder(Order order, IAlgorithm algorithm) /// public void Exit() { - var timeout = TimeSpan.FromSeconds(60); - if (_processingThreads != null) - { - // only wait if the processing thread is running - if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(timeout))) - { - Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds."); - } - - foreach (var queue in _orderRequestQueues) - { - queue.CompleteAdding(); - } - - foreach (var thread in _processingThreads) - { - thread?.StopSafely(timeout, _cancellationTokenSource); - } - } - IsActive = false; - _cancellationTokenSource.DisposeSafely(); + // Shutdown drains the queued requests (CompleteAdding) and waits for the threads before stopping + _threadPool?.Shutdown(TimeSpan.FromSeconds(60)); } /// @@ -1227,6 +1209,12 @@ private void HandleOrderEvents(List orderEvents) order.Status = orderEvent.Status; } + // notify the pool once an order reaches a final state so it can release its processing queue + if (order.Status.IsClosed()) + { + _threadPool.Release(order); + } + orderEvent.Id = order.GetNewId(); // set the modified time of the order to the fill's timestamp @@ -1937,16 +1925,6 @@ private string GetShortableErrorMessage(Symbol symbol, decimal quantity) return $"Order exceeds shortable quantity {shortableQuantity} for Symbol {symbol} requested {quantity})"; } - private void EnqueueOrderRequest(OrderRequest request, Order order) - { - var queueKey = request.OrderId; - if (order.GroupOrderManager?.Id > 0) - { - queueKey = order.GroupOrderManager.Id; - } - _orderRequestQueues[queueKey % _orderRequestQueues.Count].Add(request); - } - /// /// Holds an order and its state /// diff --git a/Engine/TransactionHandlers/OrderRequestProcessingPool.cs b/Engine/TransactionHandlers/OrderRequestProcessingPool.cs new file mode 100644 index 000000000000..5ecf02c211c4 --- /dev/null +++ b/Engine/TransactionHandlers/OrderRequestProcessingPool.cs @@ -0,0 +1,362 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using QuantConnect.Interfaces; +using QuantConnect.Logging; +using QuantConnect.Orders; +using QuantConnect.Util; + +namespace QuantConnect.Lean.Engine.TransactionHandlers +{ + /// + /// Holds the worker threads and their queues used to process order requests, dispatching each + /// request to the queue pinned to its order and growing the pool on demand when it gets saturated. + /// + /// + /// In concurrent mode each thread owns a single it consumes, + /// the pool starts at the minimum number of threads and grows up to the maximum when every thread is + /// busy with pending work. In synchronous mode there are no worker threads: a single non blocking queue + /// is drained on the caller thread via . + /// + public class OrderRequestProcessingPool + { + // one queue per worker thread; the newly updated order requests wait here to be processed + private readonly List> _queues; + private readonly List _threads; + // pins each order (or combo group) to one queue for its whole life, so all its requests are handled + // in order by the same thread even after the pool grows and re-routes new orders to other queues + private readonly Dictionary _queueIndexByKey = new(); + // tracks the completed legs of each combo group, so its pinned queue is only released once they are all done + private readonly Dictionary> _completedComboLegs = new(); + // guards the queues/threads and pin maps against the on demand growth happening concurrently with + // the worker threads, dispatching, releasing and shutdown + private readonly object _lock = new object(); + // maximum number of threads (and queues) the pool can grow to on demand + private readonly int _maximumThreads; + // true when there are no worker threads and the caller drains the single queue itself + private readonly bool _synchronous; + // set under the lock while shutting down so the pool stops growing + private bool _shuttingDown; + private readonly Action _processRequest; + private readonly Action _onError; + private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + + /// + /// True while the pool is processing order requests, false once its worker threads have finished. + /// + public bool IsActive { get; private set; } + + /// + /// The number of worker threads currently running. + /// + public int ThreadCount + { + get + { + lock (_lock) + { + return _threads.Count; + } + } + } + + /// + /// Creates a threaded pool and starts its initial worker threads. When concurrency is enabled the pool + /// starts at and grows on demand up to , + /// otherwise it runs a single fixed worker thread. + /// + /// True to grow the pool on demand, false to run a single worker thread + /// The number of worker threads the pool starts with when growing + /// The maximum number of worker threads the pool can grow to on demand + /// Handles a single order request + /// Invoked when processing fails unexpectedly + public OrderRequestProcessingPool(bool concurrencyEnabled, int minimumThreads, int maximumThreads, + Action processRequest, Action onError) + { + _synchronous = false; + _processRequest = processRequest; + _onError = onError; + // concurrency grows the pool minimum..maximum on demand, otherwise a single fixed thread is used + _maximumThreads = concurrencyEnabled ? Math.Max(1, maximumThreads) : 1; + var initialThreadsCount = concurrencyEnabled ? Math.Min(Math.Max(1, minimumThreads), _maximumThreads) : 1; + + _queues = new(_maximumThreads); + _threads = new(_maximumThreads); + IsActive = true; + for (var i = 0; i < initialThreadsCount; i++) + { + AddThread(); + } + } + + /// + /// Private constructor for the synchronous pool: a single non blocking queue and no worker threads. + /// + private OrderRequestProcessingPool(Action processRequest, Action onError) + { + _synchronous = true; + _processRequest = processRequest; + _onError = onError; + _maximumThreads = 1; + + _queues = new(1) { new BusyCollection() }; + _threads = new(0); + IsActive = true; + } + + /// + /// Creates a synchronous pool with no worker threads: its single queue is drained on the caller thread + /// via . + /// + /// Handles a single order request + /// Invoked when processing fails unexpectedly + public static OrderRequestProcessingPool Synchronous(Action processRequest, Action onError) + { + return new OrderRequestProcessingPool(processRequest, onError); + } + + /// + /// Dispatches an order request to the queue pinned to its order, growing the pool first if every existing + /// thread is already saturated. All the requests of an order, and of every leg of a combo group, are routed + /// to the same queue so they are processed in order by a single thread. + /// + /// The order request to process + /// The order the request belongs to, used to decide its routing + public void Dispatch(OrderRequest request, Order order) + { + var routingKey = GetRoutingKey(order); + + IBusyCollection queue; + lock (_lock) + { + // grow the pool first if every existing thread is already saturated + TryExpand(); + + // reuse the order's pinned queue if it has one, so it is never re-routed when the pool grows + if (!_queueIndexByKey.TryGetValue(routingKey, out var queueIndex)) + { + queueIndex = routingKey % _queues.Count; + _queueIndexByKey[routingKey] = queueIndex; + } + queue = _queues[queueIndex]; + } + + // add outside the lock, since it can block when the queue is at its bounded capacity + queue.Add(request); + } + + /// + /// Releases the queue pinned to an order once it reaches a final state, keeping the pin map bounded to the + /// orders still in flight. A combo group shares a single queue, so it is only released once every leg of + /// the group has completed. + /// + /// The order that reached a final state + public void Release(Order order) + { + var group = order.GroupOrderManager; + lock (_lock) + { + if (group == null || group.Id <= 0) + { + _queueIndexByKey.Remove(order.Id); + return; + } + + // the whole combo routes through one queue keyed by the group id, so we track its completed legs + // and only release the queue once every leg of the group has reached a final state + if (!_completedComboLegs.TryGetValue(group.Id, out var completedLegs)) + { + completedLegs = new HashSet(); + _completedComboLegs[group.Id] = completedLegs; + } + completedLegs.Add(order.Id); + if (completedLegs.Count >= group.Count) + { + _completedComboLegs.Remove(group.Id); + _queueIndexByKey.Remove(group.Id); + } + } + } + + /// + /// Computes the routing key of an order: the combo group id when it belongs to one, otherwise its own id, + /// so that every leg of a combo is routed to the same queue. + /// + private static int GetRoutingKey(Order order) + { + return order.GroupOrderManager?.Id > 0 ? order.GroupOrderManager.Id : order.Id; + } + + /// + /// Drains the pending order requests on the calling thread. Only used in synchronous mode, where there + /// are no worker threads and the caller pumps the single queue itself. + /// + public void ProcessPending() + { + try + { + Consume(_queues[0]); + } + catch (Exception err) + { + // unexpected error, we need to close down shop + _onError(err); + } + } + + /// + /// Waits for every queue to finish processing its pending requests, up to the given timeout. + /// + /// The maximum time to wait + /// True if any queue was still busy when the timeout elapsed + public bool WaitForProcessing(TimeSpan timeout) + { + // synchronous mode has no worker thread to drain the queue, the caller pumps it via ProcessPending + if (_synchronous) + { + return false; + } + + // re-check each pass so a queue added while we waited is not missed + while (TryGetBusyQueue(out var queue)) + { + if (!queue.WaitHandle.WaitOne(timeout, _cancellationTokenSource.Token)) + { + return true; + } + } + return false; + } + + /// + /// Gets a queue still processing requests, if any. + /// + /// The busy queue found, or null if every queue is idle + /// True if a busy queue was found, false if every queue is idle + private bool TryGetBusyQueue(out IBusyCollection queue) + { + lock (_lock) + { + queue = _queues.FirstOrDefault(q => q.IsBusy); + return queue != null; + } + } + + /// + /// Stops every worker thread and waits for them to terminate, up to the given timeout. + /// + /// The maximum time to wait for each thread to stop + public void Shutdown(TimeSpan timeout) + { + lock (_lock) + { + // stop growing so no queue/thread can be added while we shut down, which leaves the + // collections frozen and safe to iterate without taking a snapshot + _shuttingDown = true; + } + + foreach (var queue in _queues) + { + queue.CompleteAdding(); + } + + foreach (var thread in _threads) + { + thread?.StopSafely(timeout, _cancellationTokenSource); + } + + IsActive = false; + _cancellationTokenSource.DisposeSafely(); + } + + /// + /// Creates a queue and its dedicated worker thread and starts it. + /// Callers growing the pool on demand must hold . + /// + private void AddThread() + { + var threadId = _queues.Count; // matches the queue index this thread will consume + _queues.Add(new BusyBlockingCollection()); + var thread = new Thread(() => Run(threadId)) { IsBackground = true, Name = $"Transaction Thread {threadId}" }; + _threads.Add(thread); + thread.Start(); + } + + /// + /// Grows the pool only when every thread is busy and still has pending requests, up to the maximum. + /// Caller must hold . + /// + private void TryExpand() + { + if (_synchronous || _shuttingDown || _queues.Count >= _maximumThreads || _cancellationTokenSource.IsCancellationRequested) + { + return; + } + + // only grow when the whole pool is saturated: every thread busy and with requests still waiting + for (var i = 0; i < _queues.Count; i++) + { + var queue = _queues[i]; + if (!queue.IsBusy || queue.Count == 0) + { + return; + } + } + + AddThread(); + } + + /// + /// Worker thread entry point: consumes its queue until the pool is shut down. + /// + private void Run(int threadId) + { + IBusyCollection queue; + lock (_lock) + { + // capture our queue safely, the queues list may be growing on demand concurrently + queue = _queues[threadId]; + } + + try + { + Consume(queue); + } + catch (Exception err) + { + // unexpected error, we need to close down shop + _onError(err); + } + + Log.Trace($"OrderRequestProcessingPool.Run(): Ending Thread {threadId}..."); + IsActive = false; + } + + /// + /// Processes every request the queue yields, handing each one to the configured processor. + /// + private void Consume(IBusyCollection queue) + { + foreach (var request in queue.GetConsumingEnumerable(_cancellationTokenSource.Token)) + { + _processRequest(request); + } + } + } +} diff --git a/Launcher/config.json b/Launcher/config.json index f0498a0996ab..03af8dd9c5f9 100644 --- a/Launcher/config.json +++ b/Launcher/config.json @@ -58,7 +58,9 @@ "ignore-unknown-asset-holdings": true, // The maximum amount of transaction threads for concurrent order submissions if the brokerage supports it. - //"maximum-transaction-threads": 4, + // The pool starts at the minimum and grows up to the maximum on demand. + //"minimum-transaction-threads": 2, + //"maximum-transaction-threads": 10, // log missing data files, useful for debugging "show-missing-data-logs": false, diff --git a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs index ffb9fc557da2..7a754ef47dfb 100644 --- a/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs +++ b/Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs @@ -2514,6 +2514,203 @@ public void ProcessesComboRequestsOnSameThreadWhenConcurrencyIsEnabled() } } + [Test] + public void TransactionThreadPoolStartsAtMinimumThreads() + { + var algorithm = new TestAlgorithm(); + using var brokerage = new TestingConcurrentBrokerage(); + using var finishedEvent = new ManualResetEventSlim(false); + var transactionHandler = new TestableConcurrentBrokerageTransactionHandler(1, finishedEvent); + transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler()); + + try + { + // the pool starts with the minimum number of threads and grows only on demand + Assert.AreEqual(2, transactionHandler.ActiveThreadCount); + } + finally + { + transactionHandler.Exit(); + } + } + + [TestCase(10)] + [TestCase(3)] + public void TransactionThreadPoolGrowsUnderBacklogUpToMaximum(int maximumThreads) + { + var algorithm = new TestAlgorithm(); + using var brokerage = new TestingConcurrentBrokerage(); + + using var finishedEvent = new ManualResetEventSlim(false); + using var gate = new ManualResetEventSlim(false); + var transactionHandler = new TestableConcurrentBrokerageTransactionHandler(int.MaxValue, finishedEvent) + { + Gate = gate, + MaxThreadsOverride = maximumThreads + }; + transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler()); + + try + { + algorithm.Transactions.SetOrderProcessor(transactionHandler); + + var security = (Security)algorithm.AddEquity("SPY"); + algorithm.SetFinishedWarmingUp(); + + var reference = new DateTime(2025, 07, 03, 10, 0, 0); + security.SetMarketPrice(new Tick(reference, security.Symbol, 300, 300)); + + // starts at the minimum + Assert.AreEqual(2, transactionHandler.ActiveThreadCount); + + // keep feeding orders while threads stay blocked on the gate, forcing the pool to grow to the max + var orderId = 0; + var reachedMax = SpinWait.SpinUntil(() => + { + if (orderId < 1000) + { + var request = MakeAsyncMarketRequest(security, reference); + request.SetOrderId(++orderId); + transactionHandler.Process(request); + } + return transactionHandler.ActiveThreadCount >= maximumThreads; + }, 10000); + + Assert.IsTrue(reachedMax, $"Pool did not grow to the maximum, current size: {transactionHandler.ActiveThreadCount}"); + // never grows beyond the configured maximum + Assert.AreEqual(maximumThreads, transactionHandler.ActiveThreadCount); + } + finally + { + gate.Set(); + transactionHandler.Exit(); + } + } + + [Test] + public void KeepsAnOrderOnTheSameThreadAfterThePoolGrows() + { + var algorithm = new TestAlgorithm(); + using var brokerage = new TestingConcurrentBrokerage(); + using var finishedEvent = new ManualResetEventSlim(false); + using var gate = new ManualResetEventSlim(false); + var transactionHandler = new TestableConcurrentBrokerageTransactionHandler(int.MaxValue, finishedEvent) + { + Gate = gate, + MaxThreadsOverride = 10 + }; + transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler()); + + try + { + algorithm.Transactions.SetOrderProcessor(transactionHandler); + algorithm.SetCash(100000); + algorithm.SetFinishedWarmingUp(); + + var security1 = (Security)algorithm.AddEquity("SPY"); + var security2 = (Security)algorithm.AddEquity("AAPL"); + + var reference = new DateTime(2025, 07, 03, 10, 0, 0); + security1.SetMarketPrice(new Tick(reference, security1.Symbol, 500, 500)); + security2.SetMarketPrice(new Tick(reference, security2.Symbol, 200, 200)); + + // group id 2 pins to queue 0 (2 % 2) while the pool is at the minimum; once it grows to >= 3 + // an un-pinned request would route to queue 2 (2 % count), so this scenario detects re-routing + var groupOrderManager = new GroupOrderManager(2, 2, -1, 1m); + var leg1 = new SubmitOrderRequest(OrderType.ComboLimit, security1.Type, security1.Symbol, -1, 1m, 0, reference, "", + groupOrderManager: groupOrderManager); + leg1.SetOrderId(1); + transactionHandler.Process(leg1); + + Assert.AreEqual(2, transactionHandler.ActiveThreadCount); + + // saturate the pool with unrelated orders so it grows past the minimum + var orderId = 100; + var grew = SpinWait.SpinUntil(() => + { + if (orderId < 1100) + { + var request = MakeAsyncMarketRequest(security1, reference); + request.SetOrderId(++orderId); + transactionHandler.Process(request); + } + return transactionHandler.ActiveThreadCount >= 3; + }, 10000); + Assert.IsTrue(grew, $"the pool did not grow, current size: {transactionHandler.ActiveThreadCount}"); + + // leg 2 of the same combo arrives after the pool grew; the pin must keep it on the original queue + var leg2 = new SubmitOrderRequest(OrderType.ComboLimit, security2.Type, security2.Symbol, 1, 1m, 0, reference, "", + groupOrderManager: groupOrderManager); + leg2.SetOrderId(2); + transactionHandler.Process(leg2); + + gate.Set(); + + // both legs must have been handled by the same thread despite the pool growing in between + Assert.IsTrue(SpinWait.SpinUntil(() => + transactionHandler.RequestProcessingThreads.ContainsKey(leg1.OrderId) && + transactionHandler.RequestProcessingThreads.ContainsKey(leg2.OrderId), 10000), + "the combo legs were not processed"); + Assert.AreEqual(transactionHandler.RequestProcessingThreads[leg1.OrderId], + transactionHandler.RequestProcessingThreads[leg2.OrderId]); + } + finally + { + gate.Set(); + transactionHandler.Exit(); + } + } + + [Test] + public void DoesNotGrowWhenThePoolIsNotSaturated() + { + var algorithm = new TestAlgorithm(); + using var brokerage = new TestingConcurrentBrokerage(); + using var finishedEvent = new ManualResetEventSlim(false); + using var gate = new ManualResetEventSlim(false); + var transactionHandler = new TestableConcurrentBrokerageTransactionHandler(int.MaxValue, finishedEvent) + { + Gate = gate, + MaxThreadsOverride = 10 + }; + transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler()); + + try + { + algorithm.Transactions.SetOrderProcessor(transactionHandler); + algorithm.SetFinishedWarmingUp(); + + var security = (Security)algorithm.AddEquity("SPY"); + var reference = new DateTime(2025, 07, 03, 10, 0, 0); + security.SetMarketPrice(new Tick(reference, security.Symbol, 300, 300)); + + Assert.AreEqual(2, transactionHandler.ActiveThreadCount); + + // all even order ids route to the same queue (id % 2 == 0), keeping the other thread idle, + // so even with a backlog on one queue the pool must not grow + for (var i = 1; i <= 20; i++) + { + var request = MakeAsyncMarketRequest(security, reference); + request.SetOrderId(i * 2); + transactionHandler.Process(request); + } + + // growth is evaluated synchronously on each enqueue, so the count is final here + Assert.AreEqual(2, transactionHandler.ActiveThreadCount); + } + finally + { + gate.Set(); + transactionHandler.Exit(); + } + } + + private static SubmitOrderRequest MakeAsyncMarketRequest(Security security, DateTime date) + { + return new SubmitOrderRequest(OrderType.Market, security.Type, security.Symbol, 1, 0, 0, 0, 0, false, date, "", + asynchronous: true); + } + [TestCase("OnAccountChanged")] [TestCase("OnOptionNotification")] [TestCase("OnNewBrokerageOrderNotification")] @@ -2765,6 +2962,9 @@ public class TestBrokerageTransactionHandler : BrokerageTransactionHandler protected override TimeSpan TimeSinceLastFill => TestTimeSinceLastFill; + // no worker thread: these tests drive HandleOrderRequest manually + protected override bool SynchronousProcessing => true; + public override void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResultHandler resultHandler) { _brokerage = brokerage; @@ -2777,11 +2977,6 @@ public DateTime GetLastSyncDate() return _brokerage.LastSyncDateTimeUtc.ConvertFromUtc(TimeZones.NewYork); } - protected override void InitializeTransactionThread() - { - _orderRequestQueues = new() { new BusyCollection() }; - } - public new void RoundOrderPrices(Order order, Security security) { base.RoundOrderPrices(order, security); @@ -2875,6 +3070,15 @@ private class TestableConcurrentBrokerageTransactionHandler : BrokerageTransacti public ConcurrentDictionary RequestProcessingThreads = new(); + // blocks threads so requests pile up and force the pool to grow + public ManualResetEventSlim Gate; + + public int ActiveThreadCount => ProcessingThreadsCount; + + // overrides the pool maximum without touching the global Config + public int? MaxThreadsOverride { get; set; } + protected override int MaximumTransactionThreads => MaxThreadsOverride ?? base.MaximumTransactionThreads; + public TestableConcurrentBrokerageTransactionHandler(int expectedOrdersCount, ManualResetEventSlim finishedEvent) { _expectedOrdersCount = expectedOrdersCount; @@ -2883,6 +3087,8 @@ public TestableConcurrentBrokerageTransactionHandler(int expectedOrdersCount, Ma public override void HandleOrderRequest(OrderRequest request) { + Gate?.Wait(); + base.HandleOrderRequest(request); // Capture the thread name for debugging purposes