-
Notifications
You must be signed in to change notification settings - Fork 620
fix txn replay for fuzzy region #1412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
vazois
wants to merge
13
commits into
main
Choose a base branch
from
vazois/aofprocessor-txn
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 9 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
9152ab2
fix txn replay for fuzzy region
vazois a34033c
renaming and comments
vazois 30935d2
fix test waiting logic
vazois a7cee92
add aof replay coordinator
vazois 955dd23
augment tests
vazois 491d740
cleanup and comments
vazois d28c7e1
Merge remote-tracking branch 'upstream/main' into vazois/aofprocessor…
vazois 8cf81ec
fix formatting
vazois df68b7a
Merge remote-tracking branch 'upstream/main' into vazois/aofprocessor…
vazois 8b631ab
cleanup
vazois 112f439
revert dispose at recover to avoid breaking tests
vazois c541496
Merge branch 'main' into vazois/aofprocessor-txn
vazois 04734b2
adding comments and minor cleanup
vazois File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| // Copyright (c) Microsoft Corporation. | ||
| // Licensed under the MIT license. | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using Garnet.common; | ||
| using Garnet.networking; | ||
|
|
||
| namespace Garnet.server | ||
| { | ||
| /// <summary> | ||
| /// Sublog replay buffer (one for each sublog) | ||
| /// </summary> | ||
| public class AofReplayContext | ||
| { | ||
| public readonly List<byte[]> fuzzyRegionOps = []; | ||
| public readonly Queue<TransactionGroup> txnGroupBuffer = []; | ||
| public readonly Dictionary<int, TransactionGroup> activeTxns = []; | ||
vazois marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| public readonly RawStringInput storeInput; | ||
| public readonly ObjectInput objectStoreInput; | ||
| public CustomProcedureInput customProcInput; | ||
| public readonly SessionParseState parseState; | ||
|
|
||
| public readonly byte[] objectOutputBuffer; | ||
|
|
||
| public MemoryResult<byte> output; | ||
|
|
||
| /// <summary> | ||
| /// Fuzzy region of AOF is the region between the checkpoint start and end commit markers. | ||
| /// This regions can contain entries in both (v) and (v+1) versions. The processing logic is: | ||
| /// 1) Process (v) entries as is. | ||
| /// 2) Store aware the (v+1) entries in a buffer. | ||
vazois marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| /// 3) At the end of the fuzzy region, take a checkpoint | ||
| /// 4) Finally, replay the buffered (v+1) entries. | ||
| /// </summary> | ||
| public bool inFuzzyRegion = false; | ||
|
|
||
| public AofReplayContext() | ||
| { | ||
| parseState.Initialize(); | ||
| storeInput.parseState = parseState; | ||
| objectStoreInput.parseState = parseState; | ||
| customProcInput.parseState = parseState; | ||
| objectOutputBuffer = GC.AllocateArray<byte>(BufferSizeUtils.ServerBufferSize(new MaxSizeSettings()), pinned: true); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Add transaction group to this replay buffer | ||
| /// </summary> | ||
| /// <param name="sessionID"></param> | ||
| public void AddTransactionGroup(int sessionID) | ||
| => activeTxns[sessionID] = new(); | ||
|
|
||
| /// <summary> | ||
| /// Add transaction group to fuzzy region buffer | ||
| /// </summary> | ||
| /// <param name="group"></param> | ||
| /// <param name="commitMarker"></param> | ||
| public void AddToFuzzyRegionBuffer(TransactionGroup group, ReadOnlySpan<byte> commitMarker) | ||
| { | ||
| // Add commit marker operation | ||
| fuzzyRegionOps.Add(commitMarker.ToArray()); | ||
| // Enqueue transaction group | ||
| txnGroupBuffer.Enqueue(group); | ||
| } | ||
| } | ||
| } | ||
264 changes: 264 additions & 0 deletions
264
libs/server/AOF/ReplayCoordinator/AofReplayCoordinator.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,264 @@ | ||
| // Copyright (c) Microsoft Corporation. | ||
| // Licensed under the MIT license. | ||
|
|
||
| using System; | ||
| using System.Runtime.CompilerServices; | ||
| using Garnet.common; | ||
| using Microsoft.Extensions.Logging; | ||
| using Tsavorite.core; | ||
|
|
||
| namespace Garnet.server | ||
| { | ||
| using MainStoreAllocator = SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>; | ||
| using MainStoreFunctions = StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>; | ||
|
|
||
| using ObjectStoreAllocator = GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>; | ||
| using ObjectStoreFunctions = StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>; | ||
|
|
||
| public sealed unsafe partial class AofProcessor | ||
| { | ||
| public class AofReplayCoordinator(AofProcessor aofProcessor, ILogger logger = null) : IDisposable | ||
| { | ||
| readonly AofProcessor aofProcessor = aofProcessor; | ||
| readonly AofReplayContext aofReplayContext = InitializeReplayContext(); | ||
| public AofReplayContext GetReplayContext() => aofReplayContext; | ||
| readonly ILogger logger = logger; | ||
|
|
||
| internal static AofReplayContext InitializeReplayContext() | ||
| { | ||
| return new AofReplayContext(); | ||
| } | ||
|
|
||
| public void Dispose() | ||
| { | ||
| aofReplayContext.output.MemoryOwner?.Dispose(); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Get fuzzy region buffer count | ||
| /// </summary> | ||
| /// <returns></returns> | ||
| internal int FuzzyRegionBufferCount() => aofReplayContext.fuzzyRegionOps.Count; | ||
|
|
||
| /// <summary> | ||
| /// Clear fuzzy region buffer | ||
| /// </summary> | ||
| internal void ClearFuzzyRegionBuffer() => aofReplayContext.fuzzyRegionOps.Clear(); | ||
|
|
||
| /// <summary> | ||
| /// Add single operation to fuzzy region buffer | ||
| /// </summary> | ||
| /// <param name="entry"></param> | ||
| internal unsafe void AddFuzzyRegionOperation(ReadOnlySpan<byte> entry) => aofReplayContext.fuzzyRegionOps.Add(entry.ToArray()); | ||
|
|
||
| /// <summary> | ||
| /// This method will perform one of the following | ||
| /// 1. TxnStart: Create a new transaction group | ||
| /// 2. TxnCommit: Replay or buffer transaction group depending if we are in fuzzyRegion. | ||
| /// 3. TxnAbort: Clear corresponding sublog replay buffer. | ||
| /// 4. Default: Add an operation to an existing transaction group | ||
| /// </summary> | ||
| /// <param name="ptr"></param> | ||
| /// <param name="length"></param> | ||
| /// <param name="asReplica"></param> | ||
| /// <returns>Returns true if a txn operation was processed and added otherwise false</returns> | ||
| /// <exception cref="GarnetException"></exception> | ||
| internal unsafe bool AddOrReplayTransactionOperation(byte* ptr, int length, bool asReplica) | ||
| { | ||
| var header = *(AofHeader*)ptr; | ||
| var replayContext = GetReplayContext(); | ||
| // First try to process this as an existing transaction | ||
| if (aofReplayContext.activeTxns.TryGetValue(header.sessionID, out var group)) | ||
| { | ||
| switch (header.opType) | ||
| { | ||
| case AofEntryType.TxnStart: | ||
| throw new GarnetException("No nested transactions expected"); | ||
| case AofEntryType.TxnAbort: | ||
| ClearSessionTxn(); | ||
| break; | ||
| case AofEntryType.TxnCommit: | ||
| if (replayContext.inFuzzyRegion) | ||
| { | ||
| // If in fuzzy region we want to record the commit marker and | ||
| // buffer the transaction group for later replay | ||
| var commitMarker = new ReadOnlySpan<byte>(ptr, length); | ||
| aofReplayContext.AddToFuzzyRegionBuffer(group, commitMarker); | ||
| } | ||
| else | ||
| { | ||
| // Otherwise process transaction group immediately | ||
| ProcessTransactionGroup(ptr, asReplica, group); | ||
| } | ||
|
|
||
| // We want to clear and remove in both cases to make space for next txn from session | ||
| ClearSessionTxn(); | ||
| break; | ||
| case AofEntryType.StoredProcedure: | ||
| throw new GarnetException($"Unexpected AOF header operation type {header.opType} within transaction"); | ||
| default: | ||
| group.operations.Add(new ReadOnlySpan<byte>(ptr, length).ToArray()); | ||
| break; | ||
| } | ||
|
|
||
| void ClearSessionTxn() | ||
| { | ||
| aofReplayContext.activeTxns[header.sessionID].Clear(); | ||
| aofReplayContext.activeTxns.Remove(header.sessionID); | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| // See if you have detected a txn | ||
| switch (header.opType) | ||
| { | ||
| case AofEntryType.TxnStart: | ||
| aofReplayContext.AddTransactionGroup(header.sessionID); | ||
| break; | ||
| case AofEntryType.TxnAbort: | ||
| case AofEntryType.TxnCommit: | ||
| // We encountered a transaction end without start - this could happen because we truncated the AOF | ||
| // after a checkpoint, and the transaction belonged to the previous version. It can safely | ||
| // be ignored. | ||
| break; | ||
| default: | ||
| // Continue processing | ||
| return false; | ||
| } | ||
|
|
||
| // Processed this record succesfully | ||
vazois marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return true; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Process fuzzy region operations if any | ||
| /// </summary> | ||
| /// <param name="storeVersion"></param> | ||
| /// <param name="asReplica"></param> | ||
| internal void ProcessFuzzyRegionOperations(long storeVersion, bool asReplica) | ||
| { | ||
| var fuzzyRegionOps = aofReplayContext.fuzzyRegionOps; | ||
| if (fuzzyRegionOps.Count > 0) | ||
| logger?.LogInformation("Replaying sublogIdx: {fuzzyRegionBufferCount} records from fuzzy region for checkpoint {newVersion}", fuzzyRegionOps.Count, storeVersion); | ||
vazois marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| foreach (var entry in fuzzyRegionOps) | ||
| { | ||
| fixed (byte* entryPtr = entry) | ||
| _ = aofProcessor.ReplayOp(aofProcessor.basicContext, aofProcessor.objectStoreBasicContext, entryPtr, entry.Length, asReplica); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Process fuzzy region transaction groups | ||
| /// </summary> | ||
| /// <param name="ptr"></param> | ||
| /// <param name="asReplica"></param> | ||
| internal void ProcessFuzzyRegionTransactionGroup(byte* ptr, bool asReplica) | ||
| { | ||
| // Process transaction groups in FIFO order | ||
vazois marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| var txnGroup = aofReplayContext.txnGroupBuffer.Dequeue(); | ||
| ProcessTransactionGroup(ptr, asReplica, txnGroup); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Process provided transaction group | ||
| /// </summary> | ||
| /// <param name="asReplica"></param> | ||
| /// <param name="txnGroup"></param> | ||
| internal void ProcessTransactionGroup(byte* ptr, bool asReplica, TransactionGroup txnGroup) | ||
| { | ||
| if (!asReplica) | ||
| { | ||
| // If recovering reads will not expose partial transactions so we can replay without locking. | ||
| // Also we don't have to synchronize replay of sublogs because write ordering has been established at the time of enqueue. | ||
| ProcessTransactionGroupOperations(aofProcessor, aofProcessor.basicContext, aofProcessor.objectStoreBasicContext, txnGroup, asReplica); | ||
| } | ||
| else | ||
| { | ||
| var txnManager = aofProcessor.respServerSession.txnManager; | ||
|
|
||
| // Start by saving transaction keys for locking | ||
| SaveTransactionGroupKeysToLock(txnManager, txnGroup); | ||
|
|
||
| // Start transaction | ||
| _ = txnManager.Run(internal_txn: true); | ||
|
|
||
| // Process in parallel transaction group | ||
| ProcessTransactionGroupOperations(aofProcessor, txnManager.LockableContext, txnManager.ObjectStoreLockableContext, txnGroup, asReplica); | ||
|
|
||
| // Commit (NOTE: need to ensure that we do not write to log here) | ||
| txnManager.Commit(true); | ||
| } | ||
|
|
||
| // Helper to iterate of transaction keys and add them to lockset | ||
| static unsafe void SaveTransactionGroupKeysToLock(TransactionManager txnManager, TransactionGroup txnGroup) | ||
| { | ||
| foreach (var entry in txnGroup.operations) | ||
| { | ||
| ref var key = ref Unsafe.NullRef<SpanByte>(); | ||
| fixed (byte* entryPtr = entry) | ||
| { | ||
| var header = *(AofHeader*)entryPtr; | ||
| var isObject = false; | ||
| switch (header.opType) | ||
| { | ||
| case AofEntryType.StoreUpsert: | ||
| case AofEntryType.StoreRMW: | ||
| case AofEntryType.StoreDelete: | ||
| key = ref Unsafe.AsRef<SpanByte>(entryPtr + HeaderSize()); | ||
| isObject = false; | ||
| break; | ||
| case AofEntryType.ObjectStoreUpsert: | ||
| case AofEntryType.ObjectStoreRMW: | ||
| case AofEntryType.ObjectStoreDelete: | ||
| key = ref Unsafe.AsRef<SpanByte>(entryPtr + HeaderSize()); | ||
| isObject = true; | ||
| break; | ||
| default: | ||
| throw new GarnetException($"Invalid replay operation {header.opType} within transaction"); | ||
| } | ||
|
|
||
| // Add key to the lockset | ||
| txnManager.SaveKeyEntryToLock(ArgSlice.FromPinnedSpan(key.AsReadOnlySpan()), isObject: isObject, LockType.Exclusive); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Process transaction | ||
| static void ProcessTransactionGroupOperations<TContext, TObjectContext>(AofProcessor aofProcessor, TContext context, TObjectContext objectContext, TransactionGroup txnGroup, bool asReplica) | ||
| where TContext : ITsavoriteContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> | ||
| where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator> | ||
| { | ||
| foreach (var entry in txnGroup.operations) | ||
| { | ||
| fixed (byte* entryPtr = entry) | ||
| _ = aofProcessor.ReplayOp(context, objectContext, entryPtr, entry.Length, asReplica: asReplica); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Replay StoredProc wrapper for single and sharded logs | ||
| /// </summary> | ||
| /// <param name="id"></param> | ||
| /// <param name="ptr"></param> | ||
| internal void ReplayStoredProc(byte id, byte* ptr) | ||
| { | ||
| StoredProcRunnerBase(id, ptr); | ||
|
|
||
| // Based run stored proc method used of legacy single log implementation | ||
| void StoredProcRunnerBase(byte id, byte* ptr) | ||
| { | ||
| var curr = ptr + HeaderSize(); | ||
|
|
||
| // Reconstructing CustomProcedureInput | ||
| _ = aofReplayContext.customProcInput.DeserializeFrom(curr); | ||
|
|
||
| // Run the stored procedure with the reconstructed input | ||
| var output = aofReplayContext.output; | ||
| _ = aofProcessor.respServerSession.RunTransactionProc(id, ref aofReplayContext.customProcInput, ref output, isRecovering: true); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| // Copyright (c) Microsoft Corporation. | ||
| // Licensed under the MIT license. | ||
|
|
||
| using System.Collections.Generic; | ||
|
|
||
| namespace Garnet.server | ||
| { | ||
| /// <summary> | ||
| /// Transaction group contains list of operations associated with a given transaction | ||
| /// </summary> | ||
| public class TransactionGroup | ||
| { | ||
| /// <summary> | ||
| /// Transaction operation buffer | ||
| /// </summary> | ||
| public List<byte[]> operations = []; | ||
vazois marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /// <summary> | ||
| /// Clear the underlying buffer that holds the individual transaction operations | ||
| /// </summary> | ||
| public void Clear() => operations.Clear(); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.