Skip to content

Commit 9152ab2

Browse files
committed
fix txn replay for fuzzy region
1 parent 266e9d6 commit 9152ab2

File tree

2 files changed

+121
-54
lines changed

2 files changed

+121
-54
lines changed

libs/server/AOF/AofProcessor.cs

Lines changed: 15 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public sealed unsafe partial class AofProcessor
5050
/// </summary>
5151
BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator> objectStoreBasicContext;
5252

53-
readonly Dictionary<int, List<byte[]>> inflightTxns;
5453
readonly byte[] buffer;
5554
readonly GCHandle handle;
5655
readonly byte* bufferPtr;
@@ -83,10 +82,11 @@ public AofProcessor(
8382
objectStoreInput.parseState = parseState;
8483
customProcInput.parseState = parseState;
8584

86-
inflightTxns = new Dictionary<int, List<byte[]>>();
8785
buffer = new byte[BufferSizeUtils.ServerBufferSize(new MaxSizeSettings())];
8886
handle = GCHandle.Alloc(buffer, GCHandleType.Pinned);
8987
bufferPtr = (byte*)handle.AddrOfPinnedObject();
88+
89+
aofReplayBuffer = new AofReplayBuffer(this, logger);
9090
this.logger = logger;
9191
}
9292

@@ -181,65 +181,30 @@ internal unsafe void ProcessAofRecord(IMemoryOwner<byte> entry, int length)
181181
/// 4) Finally, replay the buffered (v+1) entries.
182182
/// </summary>
183183
bool inFuzzyRegion = false;
184-
List<byte[]> fuzzyRegionBuffer = new();
185184

186185
/// <summary>
187186
/// Process AOF record
188187
/// </summary>
189188
public unsafe void ProcessAofRecordInternal(byte* ptr, int length, bool asReplica, out bool isCheckpointStart)
190189
{
191-
AofHeader header = *(AofHeader*)ptr;
190+
var header = *(AofHeader*)ptr;
192191
isCheckpointStart = false;
193192

194-
if (inflightTxns.ContainsKey(header.sessionID))
195-
{
196-
switch (header.opType)
197-
{
198-
case AofEntryType.TxnAbort:
199-
inflightTxns[header.sessionID].Clear();
200-
inflightTxns.Remove(header.sessionID);
201-
break;
202-
case AofEntryType.TxnCommit:
203-
if (inFuzzyRegion)
204-
{
205-
fuzzyRegionBuffer.Add(new ReadOnlySpan<byte>(ptr, length).ToArray());
206-
}
207-
else
208-
{
209-
ProcessTxn(inflightTxns[header.sessionID], asReplica);
210-
inflightTxns[header.sessionID].Clear();
211-
inflightTxns.Remove(header.sessionID);
212-
}
213-
break;
214-
case AofEntryType.StoredProcedure:
215-
throw new GarnetException($"Unexpected AOF header operation type {header.opType} within transaction");
216-
default:
217-
inflightTxns[header.sessionID].Add(new ReadOnlySpan<byte>(ptr, length).ToArray());
218-
break;
219-
}
193+
// Handle transactions
194+
if (aofReplayBuffer.TryAddTransactionOperation(header, ptr, length, asReplica))
220195
return;
221-
}
222196

223197
switch (header.opType)
224198
{
225-
case AofEntryType.TxnStart:
226-
inflightTxns[header.sessionID] = [];
227-
break;
228-
case AofEntryType.TxnAbort:
229-
case AofEntryType.TxnCommit:
230-
// We encountered a transaction end without start - this could happen because we truncated the AOF
231-
// after a checkpoint, and the transaction belonged to the previous version. It can safely
232-
// be ignored.
233-
break;
234199
case AofEntryType.CheckpointStartCommit:
235200
// Inform caller that we processed a checkpoint start marker so that it can record ReplicationCheckpointStartOffset if this is a replica replay
236201
isCheckpointStart = true;
237202
if (header.aofHeaderVersion > 1)
238203
{
239204
if (inFuzzyRegion)
240205
{
241-
logger?.LogInformation("Encountered new CheckpointStartCommit before prior CheckpointEndCommit. Clearing {fuzzyRegionBufferCount} records from previous fuzzy region", fuzzyRegionBuffer.Count);
242-
fuzzyRegionBuffer.Clear();
206+
logger?.LogInformation("Encountered new CheckpointStartCommit before prior CheckpointEndCommit. Clearing {fuzzyRegionBufferCount} records from previous fuzzy region", aofReplayBuffer.FuzzyRegionBufferCount);
207+
aofReplayBuffer.ClearFuzzyRegionBuffer();
243208
}
244209
inFuzzyRegion = true;
245210
}
@@ -268,17 +233,10 @@ public unsafe void ProcessAofRecordInternal(byte* ptr, int length, bool asReplic
268233
// Take checkpoint after the fuzzy region
269234
if (asReplica && header.storeVersion > storeWrapper.store.CurrentVersion)
270235
_ = storeWrapper.TakeCheckpoint(false, logger);
236+
271237
// Process buffered records
272-
if (fuzzyRegionBuffer.Count > 0)
273-
{
274-
logger?.LogInformation("Replaying {fuzzyRegionBufferCount} records from fuzzy region for checkpoint {newVersion}", fuzzyRegionBuffer.Count, storeWrapper.store.CurrentVersion);
275-
}
276-
foreach (var entry in fuzzyRegionBuffer)
277-
{
278-
fixed (byte* entryPtr = entry)
279-
ReplayOp(entryPtr, entry.Length, asReplica);
280-
}
281-
fuzzyRegionBuffer.Clear();
238+
aofReplayBuffer.ProcessBufferedRecords(storeWrapper.store.CurrentVersion, asReplica);
239+
aofReplayBuffer.ClearFuzzyRegionBuffer();
282240
}
283241
}
284242
break;
@@ -330,7 +288,7 @@ private unsafe void ProcessTxn(List<byte[]> operations, bool asReplica)
330288

331289
private unsafe bool ReplayOp(byte* entryPtr, int length, bool replayAsReplica)
332290
{
333-
AofHeader header = *(AofHeader*)entryPtr;
291+
var header = *(AofHeader*)entryPtr;
334292

335293
// Skips (1) entries with versions that were part of prior checkpoint; and (2) future entries in fuzzy region
336294
if (SkipRecord(entryPtr, length, replayAsReplica)) return false;
@@ -358,6 +316,9 @@ private unsafe bool ReplayOp(byte* entryPtr, int length, bool replayAsReplica)
358316
case AofEntryType.StoredProcedure:
359317
RunStoredProc(header.procedureId, customProcInput, entryPtr);
360318
break;
319+
case AofEntryType.TxnCommit:
320+
aofReplayBuffer.ProcessNextTransactionBatch(replayAsReplica);
321+
break;
361322
default:
362323
throw new GarnetException($"Unknown AOF header operation type {header.opType}");
363324
}
@@ -509,7 +470,7 @@ bool BufferNewVersionRecord(AofHeader header, byte* entryPtr, int length)
509470
{
510471
if (IsNewVersionRecord(header))
511472
{
512-
fuzzyRegionBuffer.Add(new ReadOnlySpan<byte>(entryPtr, length).ToArray());
473+
aofReplayBuffer.TryAddOperation(entryPtr, length);
513474
return true;
514475
}
515476
return false;

libs/server/AOF/AofReplayBuffer.cs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using Garnet.common;
7+
using Microsoft.Extensions.Logging;
8+
9+
namespace Garnet.server
10+
{
11+
public sealed unsafe partial class AofProcessor
12+
{
13+
readonly AofReplayBuffer aofReplayBuffer;
14+
15+
public class AofReplayBuffer(AofProcessor aofProcessor, ILogger logger = null)
16+
{
17+
readonly AofProcessor aofProcessor = aofProcessor;
18+
readonly List<byte[]> fuzzyRegionBuffer = [];
19+
readonly Queue<List<byte[]>> txnBatchBuffer = [];
20+
readonly Dictionary<int, List<byte[]>> activeTxns = [];
21+
readonly ILogger logger = logger;
22+
23+
internal int FuzzyRegionBufferCount => fuzzyRegionBuffer.Count;
24+
25+
internal void ClearFuzzyRegionBuffer() => fuzzyRegionBuffer.Clear();
26+
27+
internal unsafe void TryAddOperation(byte* ptr, int length) => fuzzyRegionBuffer.Add(new ReadOnlySpan<byte>(ptr, length).ToArray());
28+
29+
internal unsafe bool TryAddTransactionOperation(AofHeader header, byte* ptr, int length, bool asReplica)
30+
{
31+
// First try to process this as an existing transaction
32+
if (activeTxns.TryGetValue(header.sessionID, out var batch))
33+
{
34+
switch (header.opType)
35+
{
36+
case AofEntryType.TxnAbort:
37+
ClearSessionTxn();
38+
break;
39+
case AofEntryType.TxnCommit:
40+
if (aofProcessor.inFuzzyRegion)
41+
{
42+
fuzzyRegionBuffer.Add(new ReadOnlySpan<byte>(ptr, length).ToArray());
43+
txnBatchBuffer.Enqueue(batch);
44+
}
45+
else
46+
aofProcessor.ProcessTxn(batch, asReplica);
47+
48+
// We want to clear and remove in both cases to make space for next txn from session
49+
ClearSessionTxn();
50+
break;
51+
case AofEntryType.StoredProcedure:
52+
throw new GarnetException($"Unexpected AOF header operation type {header.opType} within transaction");
53+
default:
54+
batch.Add(new ReadOnlySpan<byte>(ptr, length).ToArray());
55+
break;
56+
}
57+
58+
void ClearSessionTxn()
59+
{
60+
activeTxns[header.sessionID].Clear();
61+
activeTxns.Remove(header.sessionID);
62+
}
63+
64+
return true;
65+
}
66+
67+
// See if you have detected a txn
68+
switch (header.opType)
69+
{
70+
case AofEntryType.TxnStart:
71+
activeTxns[header.sessionID] = [];
72+
break;
73+
case AofEntryType.TxnAbort:
74+
case AofEntryType.TxnCommit:
75+
// We encountered a transaction end without start - this could happen because we truncated the AOF
76+
// after a checkpoint, and the transaction belonged to the previous version. It can safely
77+
// be ignored.
78+
break;
79+
default:
80+
// Continue processing
81+
return false;
82+
}
83+
84+
// Processed this record succesfully
85+
return true;
86+
}
87+
88+
internal void ProcessBufferedRecords(long storeVersion, bool asReplica)
89+
{
90+
if (fuzzyRegionBuffer.Count > 0)
91+
logger?.LogInformation("Replaying {fuzzyRegionBufferCount} records from fuzzy region for checkpoint {newVersion}", fuzzyRegionBuffer.Count, storeVersion);
92+
foreach (var entry in fuzzyRegionBuffer)
93+
{
94+
fixed (byte* entryPtr = entry)
95+
aofProcessor.ReplayOp(entryPtr, entry.Length, asReplica);
96+
}
97+
}
98+
99+
internal void ProcessNextTransactionBatch(bool asReplica)
100+
{
101+
var batch = txnBatchBuffer.Dequeue();
102+
aofProcessor.ProcessTxn(batch, asReplica);
103+
}
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)