Skip to content

Commit 233a1f0

Browse files
committed
renaming and comments
1 parent 9152ab2 commit 233a1f0

File tree

2 files changed

+24
-13
lines changed

2 files changed

+24
-13
lines changed

libs/server/AOF/AofProcessor.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public unsafe void ProcessAofRecordInternal(byte* ptr, int length, bool asReplic
235235
_ = storeWrapper.TakeCheckpoint(false, logger);
236236

237237
// Process buffered records
238-
aofReplayBuffer.ProcessBufferedRecords(storeWrapper.store.CurrentVersion, asReplica);
238+
aofReplayBuffer.ProcessFuzzyRegionRecords(storeWrapper.store.CurrentVersion, asReplica);
239239
aofReplayBuffer.ClearFuzzyRegionBuffer();
240240
}
241241
}
@@ -317,7 +317,7 @@ private unsafe bool ReplayOp(byte* entryPtr, int length, bool replayAsReplica)
317317
RunStoredProc(header.procedureId, customProcInput, entryPtr);
318318
break;
319319
case AofEntryType.TxnCommit:
320-
aofReplayBuffer.ProcessNextTransactionBatch(replayAsReplica);
320+
aofReplayBuffer.ProcessFuzzyRegionTransactions(replayAsReplica);
321321
break;
322322
default:
323323
throw new GarnetException($"Unknown AOF header operation type {header.opType}");

libs/server/AOF/AofReplayBuffer.cs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@ public sealed unsafe partial class AofProcessor
1515
public class AofReplayBuffer(AofProcessor aofProcessor, ILogger logger = null)
1616
{
1717
readonly AofProcessor aofProcessor = aofProcessor;
18-
readonly List<byte[]> fuzzyRegionBuffer = [];
18+
readonly List<byte[]> fuzzyRegionOps = [];
1919
readonly Queue<List<byte[]>> txnBatchBuffer = [];
2020
readonly Dictionary<int, List<byte[]>> activeTxns = [];
2121
readonly ILogger logger = logger;
2222

23-
internal int FuzzyRegionBufferCount => fuzzyRegionBuffer.Count;
23+
internal int FuzzyRegionBufferCount => fuzzyRegionOps.Count;
2424

25-
internal void ClearFuzzyRegionBuffer() => fuzzyRegionBuffer.Clear();
25+
internal void ClearFuzzyRegionBuffer() => fuzzyRegionOps.Clear();
2626

27-
internal unsafe void TryAddOperation(byte* ptr, int length) => fuzzyRegionBuffer.Add(new ReadOnlySpan<byte>(ptr, length).ToArray());
27+
internal unsafe void TryAddOperation(byte* ptr, int length) => fuzzyRegionOps.Add(new ReadOnlySpan<byte>(ptr, length).ToArray());
2828

2929
internal unsafe bool TryAddTransactionOperation(AofHeader header, byte* ptr, int length, bool asReplica)
3030
{
@@ -39,13 +39,15 @@ internal unsafe bool TryAddTransactionOperation(AofHeader header, byte* ptr, int
3939
case AofEntryType.TxnCommit:
4040
if (aofProcessor.inFuzzyRegion)
4141
{
42-
fuzzyRegionBuffer.Add(new ReadOnlySpan<byte>(ptr, length).ToArray());
42+
// Buffer commit marker and operations batch when in fuzzy region
43+
fuzzyRegionOps.Add(new ReadOnlySpan<byte>(ptr, length).ToArray());
4344
txnBatchBuffer.Enqueue(batch);
4445
}
4546
else
4647
aofProcessor.ProcessTxn(batch, asReplica);
4748

48-
// We want to clear and remove in both cases to make space for next txn from session
49+
// We want to clear and remove in both cases to make space for next txn from session if any
50+
// Example (assume generated from same session): TxnStart1 CheckpointStart TxnCommit1 TxnStart2 CheckpointEnd TxnCommit2
4951
ClearSessionTxn();
5052
break;
5153
case AofEntryType.StoredProcedure:
@@ -85,18 +87,27 @@ void ClearSessionTxn()
8587
return true;
8688
}
8789

88-
internal void ProcessBufferedRecords(long storeVersion, bool asReplica)
90+
/// <summary>
91+
/// Process fuzzy region operations if any
92+
/// </summary>
93+
/// <param name="storeVersion"></param>
94+
/// <param name="asReplica"></param>
95+
internal void ProcessFuzzyRegionOperations(long storeVersion, bool asReplica)
8996
{
90-
if (fuzzyRegionBuffer.Count > 0)
91-
logger?.LogInformation("Replaying {fuzzyRegionBufferCount} records from fuzzy region for checkpoint {newVersion}", fuzzyRegionBuffer.Count, storeVersion);
92-
foreach (var entry in fuzzyRegionBuffer)
97+
if (fuzzyRegionOps.Count > 0)
98+
logger?.LogInformation("Replaying {fuzzyRegionBufferCount} records from fuzzy region for checkpoint {newVersion}", fuzzyRegionOps.Count, storeVersion);
99+
foreach (var entry in fuzzyRegionOps)
93100
{
94101
fixed (byte* entryPtr = entry)
95102
aofProcessor.ReplayOp(entryPtr, entry.Length, asReplica);
96103
}
97104
}
98105

99-
internal void ProcessNextTransactionBatch(bool asReplica)
106+
/// <summary>
107+
/// Process fuzzy region transactions in FIFO order
108+
/// </summary>
109+
/// <param name="asReplica"></param>
110+
internal void ProcessFuzzyRegionTransactions(bool asReplica)
100111
{
101112
var batch = txnBatchBuffer.Dequeue();
102113
aofProcessor.ProcessTxn(batch, asReplica);

0 commit comments

Comments
 (0)