Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions libs/server/Transaction/TransactionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ internal void Reset(bool isRunning)
{
txnKeysParseState.Count = 0;
saveKeyRecvBufferPtr = null;
firstKeyInCurrentRecvBuffer = 0;
txnScratchBufferAllocator.Reset();
}
}
Expand Down Expand Up @@ -339,12 +340,8 @@ internal void AddTransactionStoreType(StoreType storeType)

internal void GetSlotVerificationInput(byte* recvBufferPtr, byte sessionAsking, out ClusterSlotVerificationInput clusterSlotVerificationInput)
{
// Copy keys if buffer changed since last queued command
if (recvBufferPtr != saveKeyRecvBufferPtr)
{
CopyExistingKeysToScratchBuffer();
saveKeyRecvBufferPtr = recvBufferPtr;
}
// Materialize only keys captured from the previous receive buffer when it changes.
OnRecvBufferChanged(recvBufferPtr);

watchContainer.SaveKeysToKeyList(this);
clusterSlotVerificationInput = new ClusterSlotVerificationInput
Expand Down
29 changes: 28 additions & 1 deletion libs/server/Transaction/TxnClusterSlotCheck.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,27 @@ sealed unsafe partial class TransactionManager
{
readonly bool clusterEnabled;
internal byte* saveKeyRecvBufferPtr;
int firstKeyInCurrentRecvBuffer;

public void BeginKeyTrackingForCurrentBuffer(byte* recvBufferPtr)
{
if (!clusterEnabled) return;

saveKeyRecvBufferPtr = recvBufferPtr;
firstKeyInCurrentRecvBuffer = txnKeysParseState.Count;
}

public void OnRecvBufferChanged(byte* recvBufferPtr)
{
if (!clusterEnabled || recvBufferPtr == saveKeyRecvBufferPtr)
return;

Debug.Assert(firstKeyInCurrentRecvBuffer <= txnKeysParseState.Count);

CopyKeysToScratchBuffer(firstKeyInCurrentRecvBuffer);
firstKeyInCurrentRecvBuffer = txnKeysParseState.Count;
saveKeyRecvBufferPtr = recvBufferPtr;
}

/// <summary>
/// Keep track of actual key accessed by command
Expand Down Expand Up @@ -37,7 +58,13 @@ public void CopyExistingKeysToScratchBuffer()
{
Debug.Assert(clusterEnabled);

for (var i = 0; i < txnKeysParseState.Count; i++)
CopyKeysToScratchBuffer(0);
firstKeyInCurrentRecvBuffer = txnKeysParseState.Count;
}

void CopyKeysToScratchBuffer(int startIndex)
{
for (var i = startIndex; i < txnKeysParseState.Count; i++)
{
ref var key = ref txnKeysParseState.GetArgSliceByRef(i);
key = txnScratchBufferAllocator.CreateArgSlice(key.ReadOnlySpan);
Expand Down
11 changes: 4 additions & 7 deletions libs/server/Transaction/TxnRespCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ private bool NetworkMULTI()
txnManager.txnStartHead = readHead;
txnManager.state = TxnState.Started;
txnManager.operationCntTxn = 0;
// Track receive buffer ptr for key pointer adjustment at EXEC time
txnManager.saveKeyRecvBufferPtr = recvBufferPtr;
// Track receive buffer for incremental key materialization during queuing
txnManager.BeginKeyTrackingForCurrentBuffer(recvBufferPtr);

while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
Expand Down Expand Up @@ -184,11 +184,8 @@ private bool NetworkSKIP(RespCommand cmd)
return true;
}

if (clusterSession != null && recvBufferPtr != txnManager.saveKeyRecvBufferPtr)
{
txnManager.CopyExistingKeysToScratchBuffer();
txnManager.saveKeyRecvBufferPtr = recvBufferPtr;
}
if (clusterSession != null)
txnManager.OnRecvBufferChanged(recvBufferPtr);

txnManager.LockKeys(commandInfo);

Expand Down
10 changes: 10 additions & 0 deletions playground/_perf_probe_txn/PerfProbe/PerfProbe.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>disable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

</Project>
115 changes: 115 additions & 0 deletions playground/_perf_probe_txn/PerfProbe/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;

const int WarmupIterations = 5;
const int MeasureIterations = 15;

var scenarios = new[]
{
new Scenario("Typical", 5000, 32, 32),
new Scenario("Stress", 20000, 48, 8),
};

Console.WriteLine("Txn key-copy perf probe (old vs new)");
Console.WriteLine();

foreach (var scenario in scenarios)
{
var oldStats = Measure(scenario, RunOld);
var newStats = Measure(scenario, RunNew);

var speedup = oldStats.MeanMs / newStats.MeanMs;
var allocReduction = 100.0 * (oldStats.MeanAllocBytes - newStats.MeanAllocBytes) / oldStats.MeanAllocBytes;

Console.WriteLine($"Scenario: {scenario.Name}");
Console.WriteLine($" Old : {oldStats.MeanMs,8:F2} ms, alloc {oldStats.MeanAllocBytes / (1024.0 * 1024.0),8:F2} MB");
Console.WriteLine($" New : {newStats.MeanMs,8:F2} ms, alloc {newStats.MeanAllocBytes / (1024.0 * 1024.0),8:F2} MB");
Console.WriteLine($" Delta: {speedup,8:F2}x faster, alloc {allocReduction,8:F2}% lower");
Console.WriteLine();
}

static Stats Measure(Scenario scenario, Action<Scenario> run)
{
for (var i = 0; i < WarmupIterations; i++)
run(scenario);

var totalMs = 0.0;
var totalAlloc = 0L;

for (var i = 0; i < MeasureIterations; i++)
{
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();

var allocStart = GC.GetAllocatedBytesForCurrentThread();
var sw = Stopwatch.StartNew();
run(scenario);
sw.Stop();
var allocEnd = GC.GetAllocatedBytesForCurrentThread();

totalMs += sw.Elapsed.TotalMilliseconds;
totalAlloc += allocEnd - allocStart;
}

return new Stats(totalMs / MeasureIterations, totalAlloc / MeasureIterations);
}

static void RunOld(Scenario s)
{
var txnKeys = new List<byte[]>(s.TotalKeys);
var scratch = new List<byte[]>(s.TotalKeys * 4);

for (var i = 0; i < s.TotalKeys; i++)
{
if (i > 0 && i % s.BufferSwitchEvery == 0)
{
// Old behavior: copy all tracked keys each time receive buffer changes.
for (var k = 0; k < txnKeys.Count; k++)
{
var src = txnKeys[k];
var dst = new byte[src.Length];
Buffer.BlockCopy(src, 0, dst, 0, src.Length);
scratch.Add(dst);
txnKeys[k] = dst;
}
}

var key = new byte[s.KeySize];
key[0] = (byte)i;
txnKeys.Add(key);
}
}

static void RunNew(Scenario s)
{
var txnKeys = new List<byte[]>(s.TotalKeys);
var scratch = new List<byte[]>(s.TotalKeys * 2);
var firstKeyInCurrentBuffer = 0;

for (var i = 0; i < s.TotalKeys; i++)
{
if (i > 0 && i % s.BufferSwitchEvery == 0)
{
// New behavior: copy only keys from the previous receive buffer segment.
for (var k = firstKeyInCurrentBuffer; k < txnKeys.Count; k++)
{
var src = txnKeys[k];
var dst = new byte[src.Length];
Buffer.BlockCopy(src, 0, dst, 0, src.Length);
scratch.Add(dst);
txnKeys[k] = dst;
}

firstKeyInCurrentBuffer = txnKeys.Count;
}

var key = new byte[s.KeySize];
key[0] = (byte)i;
txnKeys.Add(key);
}
}

readonly record struct Scenario(string Name, int TotalKeys, int KeySize, int BufferSwitchEvery);
readonly record struct Stats(double MeanMs, long MeanAllocBytes);