Skip to content
507 changes: 234 additions & 273 deletions libs/server/AOF/AofProcessor.cs

Large diffs are not rendered by default.

68 changes: 68 additions & 0 deletions libs/server/AOF/ReplayCoordinator/AofReplayContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using Garnet.common;
using Garnet.networking;

namespace Garnet.server
{
/// <summary>
/// Sublog replay buffer (one for each sublog)
/// </summary>
public class AofReplayContext
{
public readonly List<byte[]> fuzzyRegionOps = [];
public readonly Queue<TransactionGroup> txnGroupBuffer = [];
public readonly Dictionary<int, TransactionGroup> activeTxns = [];

public readonly RawStringInput storeInput;
public readonly ObjectInput objectStoreInput;
public CustomProcedureInput customProcInput;
public readonly SessionParseState parseState;

public readonly byte[] objectOutputBuffer;

public MemoryResult<byte> output;

/// <summary>
/// Fuzzy region of AOF is the region between the checkpoint start and end commit markers.
/// This regions can contain entries in both (v) and (v+1) versions. The processing logic is:
/// 1) Process (v) entries as is.
/// 2) Store aware the (v+1) entries in a buffer.
/// 3) At the end of the fuzzy region, take a checkpoint
/// 4) Finally, replay the buffered (v+1) entries.
/// </summary>
public bool inFuzzyRegion = false;

public AofReplayContext()
{
parseState.Initialize();
storeInput.parseState = parseState;
objectStoreInput.parseState = parseState;
customProcInput.parseState = parseState;
objectOutputBuffer = GC.AllocateArray<byte>(BufferSizeUtils.ServerBufferSize(new MaxSizeSettings()), pinned: true);
}

/// <summary>
/// Add transaction group to this replay buffer
/// </summary>
/// <param name="sessionID"></param>
public void AddTransactionGroup(int sessionID)
=> activeTxns[sessionID] = new();

/// <summary>
/// Add transaction group to fuzzy region buffer
/// </summary>
/// <param name="group"></param>
/// <param name="commitMarker"></param>
public void AddToFuzzyRegionBuffer(TransactionGroup group, ReadOnlySpan<byte> commitMarker)
{
// Add commit marker operation
fuzzyRegionOps.Add(commitMarker.ToArray());
// Enqueue transaction group
txnGroupBuffer.Enqueue(group);
}
}
}
264 changes: 264 additions & 0 deletions libs/server/AOF/ReplayCoordinator/AofReplayCoordinator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Runtime.CompilerServices;
using Garnet.common;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Garnet.server
{
using MainStoreAllocator = SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>;
using MainStoreFunctions = StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>;

using ObjectStoreAllocator = GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>;
using ObjectStoreFunctions = StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>;

public sealed unsafe partial class AofProcessor
{
public class AofReplayCoordinator(AofProcessor aofProcessor, ILogger logger = null) : IDisposable
{
readonly AofProcessor aofProcessor = aofProcessor;
readonly AofReplayContext aofReplayContext = InitializeReplayContext();
public AofReplayContext GetReplayContext() => aofReplayContext;
readonly ILogger logger = logger;

internal static AofReplayContext InitializeReplayContext()
{
return new AofReplayContext();
}

public void Dispose()
{
aofReplayContext.output.MemoryOwner?.Dispose();
}

/// <summary>
/// Get fuzzy region buffer count
/// </summary>
/// <returns></returns>
internal int FuzzyRegionBufferCount() => aofReplayContext.fuzzyRegionOps.Count;

/// <summary>
/// Clear fuzzy region buffer
/// </summary>
internal void ClearFuzzyRegionBuffer() => aofReplayContext.fuzzyRegionOps.Clear();

/// <summary>
/// Add single operation to fuzzy region buffer
/// </summary>
/// <param name="entry"></param>
internal unsafe void AddFuzzyRegionOperation(ReadOnlySpan<byte> entry) => aofReplayContext.fuzzyRegionOps.Add(entry.ToArray());

/// <summary>
/// This method will perform one of the following
/// 1. TxnStart: Create a new transaction group
/// 2. TxnCommit: Replay or buffer transaction group depending if we are in fuzzyRegion.
/// 3. TxnAbort: Clear corresponding sublog replay buffer.
/// 4. Default: Add an operation to an existing transaction group
/// </summary>
/// <param name="ptr"></param>
/// <param name="length"></param>
/// <param name="asReplica"></param>
/// <returns>Returns true if a txn operation was processed and added otherwise false</returns>
/// <exception cref="GarnetException"></exception>
internal unsafe bool AddOrReplayTransactionOperation(byte* ptr, int length, bool asReplica)
{
var header = *(AofHeader*)ptr;
var replayContext = GetReplayContext();
// First try to process this as an existing transaction
if (aofReplayContext.activeTxns.TryGetValue(header.sessionID, out var group))
{
switch (header.opType)
{
case AofEntryType.TxnStart:
throw new GarnetException("No nested transactions expected");
case AofEntryType.TxnAbort:
ClearSessionTxn();
break;
case AofEntryType.TxnCommit:
if (replayContext.inFuzzyRegion)
{
// If in fuzzy region we want to record the commit marker and
// buffer the transaction group for later replay
var commitMarker = new ReadOnlySpan<byte>(ptr, length);
aofReplayContext.AddToFuzzyRegionBuffer(group, commitMarker);
}
else
{
// Otherwise process transaction group immediately
ProcessTransactionGroup(ptr, asReplica, group);
}

// We want to clear and remove in both cases to make space for next txn from session
ClearSessionTxn();
break;
case AofEntryType.StoredProcedure:
throw new GarnetException($"Unexpected AOF header operation type {header.opType} within transaction");
default:
group.operations.Add(new ReadOnlySpan<byte>(ptr, length).ToArray());
break;
}

void ClearSessionTxn()
{
aofReplayContext.activeTxns[header.sessionID].Clear();
aofReplayContext.activeTxns.Remove(header.sessionID);
}

return true;
}

// See if you have detected a txn
switch (header.opType)
{
case AofEntryType.TxnStart:
aofReplayContext.AddTransactionGroup(header.sessionID);
break;
case AofEntryType.TxnAbort:
case AofEntryType.TxnCommit:
// We encountered a transaction end without start - this could happen because we truncated the AOF
// after a checkpoint, and the transaction belonged to the previous version. It can safely
// be ignored.
break;
default:
// Continue processing
return false;
}

// Processed this record succesfully
return true;
}

/// <summary>
/// Process fuzzy region operations if any
/// </summary>
/// <param name="storeVersion"></param>
/// <param name="asReplica"></param>
internal void ProcessFuzzyRegionOperations(long storeVersion, bool asReplica)
{
var fuzzyRegionOps = aofReplayContext.fuzzyRegionOps;
if (fuzzyRegionOps.Count > 0)
logger?.LogInformation("Replaying sublogIdx: {fuzzyRegionBufferCount} records from fuzzy region for checkpoint {newVersion}", fuzzyRegionOps.Count, storeVersion);
foreach (var entry in fuzzyRegionOps)
{
fixed (byte* entryPtr = entry)
_ = aofProcessor.ReplayOp(aofProcessor.basicContext, aofProcessor.objectStoreBasicContext, entryPtr, entry.Length, asReplica);
}
}

/// <summary>
/// Process fuzzy region transaction groups
/// </summary>
/// <param name="ptr"></param>
/// <param name="asReplica"></param>
internal void ProcessFuzzyRegionTransactionGroup(byte* ptr, bool asReplica)
{
// Process transaction groups in FIFO order
var txnGroup = aofReplayContext.txnGroupBuffer.Dequeue();
ProcessTransactionGroup(ptr, asReplica, txnGroup);
}

/// <summary>
/// Process provided transaction group
/// </summary>
/// <param name="asReplica"></param>
/// <param name="txnGroup"></param>
internal void ProcessTransactionGroup(byte* ptr, bool asReplica, TransactionGroup txnGroup)
{
if (!asReplica)
{
// If recovering reads will not expose partial transactions so we can replay without locking.
// Also we don't have to synchronize replay of sublogs because write ordering has been established at the time of enqueue.
ProcessTransactionGroupOperations(aofProcessor, aofProcessor.basicContext, aofProcessor.objectStoreBasicContext, txnGroup, asReplica);
}
else
{
var txnManager = aofProcessor.respServerSession.txnManager;

// Start by saving transaction keys for locking
SaveTransactionGroupKeysToLock(txnManager, txnGroup);

// Start transaction
_ = txnManager.Run(internal_txn: true);

// Process in parallel transaction group
ProcessTransactionGroupOperations(aofProcessor, txnManager.LockableContext, txnManager.ObjectStoreLockableContext, txnGroup, asReplica);

// Commit (NOTE: need to ensure that we do not write to log here)
txnManager.Commit(true);
}

// Helper to iterate of transaction keys and add them to lockset
static unsafe void SaveTransactionGroupKeysToLock(TransactionManager txnManager, TransactionGroup txnGroup)
{
foreach (var entry in txnGroup.operations)
{
ref var key = ref Unsafe.NullRef<SpanByte>();
fixed (byte* entryPtr = entry)
{
var header = *(AofHeader*)entryPtr;
var isObject = false;
switch (header.opType)
{
case AofEntryType.StoreUpsert:
case AofEntryType.StoreRMW:
case AofEntryType.StoreDelete:
key = ref Unsafe.AsRef<SpanByte>(entryPtr + HeaderSize());
isObject = false;
break;
case AofEntryType.ObjectStoreUpsert:
case AofEntryType.ObjectStoreRMW:
case AofEntryType.ObjectStoreDelete:
key = ref Unsafe.AsRef<SpanByte>(entryPtr + HeaderSize());
isObject = true;
break;
default:
throw new GarnetException($"Invalid replay operation {header.opType} within transaction");
}

// Add key to the lockset
txnManager.SaveKeyEntryToLock(ArgSlice.FromPinnedSpan(key.AsReadOnlySpan()), isObject: isObject, LockType.Exclusive);
}
}
}

// Process transaction
static void ProcessTransactionGroupOperations<TContext, TObjectContext>(AofProcessor aofProcessor, TContext context, TObjectContext objectContext, TransactionGroup txnGroup, bool asReplica)
where TContext : ITsavoriteContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator>
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator>
{
foreach (var entry in txnGroup.operations)
{
fixed (byte* entryPtr = entry)
_ = aofProcessor.ReplayOp(context, objectContext, entryPtr, entry.Length, asReplica: asReplica);
}
}
}

/// <summary>
/// Replay StoredProc wrapper for single and sharded logs
/// </summary>
/// <param name="id"></param>
/// <param name="ptr"></param>
internal void ReplayStoredProc(byte id, byte* ptr)
{
StoredProcRunnerBase(id, ptr);

// Based run stored proc method used of legacy single log implementation
void StoredProcRunnerBase(byte id, byte* ptr)
{
var curr = ptr + HeaderSize();

// Reconstructing CustomProcedureInput
_ = aofReplayContext.customProcInput.DeserializeFrom(curr);

// Run the stored procedure with the reconstructed input
var output = aofReplayContext.output;
_ = aofProcessor.respServerSession.RunTransactionProc(id, ref aofReplayContext.customProcInput, ref output, isRecovering: true);
}
}
}
}
}
23 changes: 23 additions & 0 deletions libs/server/AOF/ReplayCoordinator/TransactionGroup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System.Collections.Generic;

namespace Garnet.server
{
/// <summary>
/// Transaction group contains list of operations associated with a given transaction
/// </summary>
public class TransactionGroup
{
/// <summary>
/// Transaction operation buffer
/// </summary>
public List<byte[]> operations = [];

/// <summary>
/// Clear the underlying buffer that holds the individual transaction operations
/// </summary>
public void Clear() => operations.Clear();
}
}
29 changes: 29 additions & 0 deletions test/Garnet.test.cluster/ClusterTestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,35 @@ public void PopulatePrimary(
}
}

public void SimplePopulateDB(bool disableObjects, int keyLength, int kvpairCount, int primaryIndex, int addCount = 0, bool perforRMW = false)
{
//Populate Primary
if (disableObjects)
{
PopulatePrimary(ref kvPairs, keyLength, kvpairCount, primaryIndex);
}
else
{
if (!perforRMW)
PopulatePrimaryWithObjects(ref kvPairsObj, keyLength, kvpairCount, primaryIndex);
else
PopulatePrimaryRMW(ref kvPairs, keyLength, kvpairCount, primaryIndex, addCount);
}
}

public void SimpleValidateDB(bool disableObjects, int replicaIndex)
{
// Validate database
if (disableObjects)
{
ValidateKVCollectionAgainstReplica(ref kvPairs, replicaIndex);
}
else
{
ValidateNodeObjects(ref kvPairsObj, replicaIndex);
}
}

public void PopulatePrimaryRMW(ref Dictionary<string, int> kvPairs, int keyLength, int kvpairCount, int primaryIndex, int addCount, int[] slotMap = null, bool incrementalSnapshots = false, int ckptNode = 0, int randomSeed = -1)
{
if (randomSeed != -1) clusterTestUtils.InitRandom(randomSeed);
Expand Down
Loading
Loading