Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
206 commits
Select commit Hold shift + click to select a range
22f2646
Baseline sketch for Vector ops - hiding algorithm provider behind an …
kevin-montrose Jul 30, 2025
42ed3b9
properly distinguish vector set elements; this version stinks, but ca…
kevin-montrose Aug 15, 2025
12a9210
horrible hacks for 'is a vector set' and 'is hidden'; I don't love ei…
kevin-montrose Aug 15, 2025
91d34ce
some test fixes, before tearing out some of the worse ideas in here
kevin-montrose Aug 15, 2025
8054fd0
horrible hack version of 'make sure vector _element_ keys don't colli…
kevin-montrose Aug 18, 2025
c63006d
move SET to the vector hackery as a demonstration
kevin-montrose Aug 18, 2025
a9889f6
Span-ify locking interface, these will become more important perf wis…
kevin-montrose Aug 19, 2025
c9e7e37
cleanup parsing for VADD and VSIM to match Redis behavior
kevin-montrose Aug 19, 2025
02cd381
prove out VDIM impl
kevin-montrose Aug 19, 2025
4f7cdba
this is unnecessary
kevin-montrose Aug 20, 2025
7135cb3
get delete for vector sets working; fix VDIM
kevin-montrose Aug 20, 2025
8321302
fix vdim test
kevin-montrose Aug 20, 2025
e0f5e1f
fix comment
kevin-montrose Aug 21, 2025
a564d85
document expected VADD error behavior in a test, which currently fails
kevin-montrose Aug 21, 2025
5b975cc
fix more validation
kevin-montrose Aug 25, 2025
881ce84
fix sizing of distance buffer when calling into IVectorService
kevin-montrose Aug 25, 2025
538da83
optimisitic resize outputIds to avoid continuations
kevin-montrose Aug 25, 2025
dcc98be
jank in proc benchmarks for vector options; this commit should not la…
kevin-montrose Aug 26, 2025
1a81eea
blittable types for interop; bool is gone, byte* is gone
kevin-montrose Aug 26, 2025
a9f63e2
fix distinguish length in case where allocations are necessary (thoug…
kevin-montrose Aug 26, 2025
49f82ff
DiskANN as the provider
kevin-montrose Aug 27, 2025
a7e0cbf
sketch out actually loading test data from somewhere
kevin-montrose Aug 27, 2025
cc94513
conditionally include the diskann bits if they're available
kevin-montrose Aug 27, 2025
2c536f5
needs impl
kevin-montrose Aug 27, 2025
b6f83e3
switch to object store for locking as a temporary work around for som…
kevin-montrose Aug 27, 2025
35f1b03
benchmarks run to completion
kevin-montrose Aug 27, 2025
f31140e
more realistic configs
kevin-montrose Aug 27, 2025
6ac06d2
no suffix required here
kevin-montrose Aug 28, 2025
03df36d
bring FILLBENCH inline with BENCHRWMIX
kevin-montrose Aug 28, 2025
ce0346a
Merge branch 'main' into vectorApiPoC
kevin-montrose Sep 2, 2025
5b0c501
Merge branch 'vectorApiPoC' of https://github.com/microsoft/garnet in…
kevin-montrose Sep 2, 2025
0e6bfdb
formatting!
kevin-montrose Sep 2, 2025
675c043
lay framework for some custom quantizations and vector formats
kevin-montrose Sep 2, 2025
7d36ea5
fixes after XB8 additions
kevin-montrose Sep 3, 2025
b13fd5e
switch to an approximation of namespaces; this version won't land, bu…
kevin-montrose Sep 3, 2025
4c834a2
more fixes for namespace additions
kevin-montrose Sep 4, 2025
74d5a45
fix merge
kevin-montrose Sep 4, 2025
7f93ca7
fix replication and migration in cluster
kevin-montrose Sep 4, 2025
1073836
move to NuGet package (currently only in internal feeds) for DiskANN ref
kevin-montrose Sep 5, 2025
b157b10
Merge branch 'main' into vectorApiPoC
kevin-montrose Sep 5, 2025
ac8f396
remove hacky benchmark functions
kevin-montrose Sep 5, 2025
2b0b18c
remove benchmark commands
kevin-montrose Sep 5, 2025
313cbe1
wire up an extension quantizer with some validation
kevin-montrose Sep 5, 2025
5ee1056
expand VADD validation tests (currently failing)
kevin-montrose Sep 5, 2025
a6e5860
add vismember (no implementation), missed it in first pass
kevin-montrose Sep 10, 2025
374dd90
Merge branch 'main' into vectorApiPoC
kevin-montrose Sep 11, 2025
77967a7
fix replication of Vector Sets
kevin-montrose Sep 15, 2025
8f31144
stress replication with vector sets a bit; fix bugs
kevin-montrose Sep 15, 2025
3f1656d
better match Redis behavior
kevin-montrose Sep 15, 2025
477f6f3
fix various bugs with replication fixes
kevin-montrose Sep 16, 2025
6bc3923
log more details if we get the wrong sizes
kevin-montrose Sep 17, 2025
2d66b22
test repeated deletes, this appears to crash in DiskANN
kevin-montrose Sep 18, 2025
a0c1614
fix deletion of vector sets
kevin-montrose Sep 18, 2025
0095cd2
formatting
kevin-montrose Sep 18, 2025
58c405c
sketch out a multi-insert version of IVectorService, probably needed …
kevin-montrose Sep 22, 2025
620a846
explicitly test many replicas with VADD workloads
kevin-montrose Sep 23, 2025
0c41181
spread VADD replication across multiple tasks
kevin-montrose Sep 23, 2025
951a532
fixes for concurrent vadd in replication; more blocking is necessary …
kevin-montrose Sep 24, 2025
056d505
address feedback; switch to manualresetevent for waits instead of spi…
kevin-montrose Sep 24, 2025
bc9e503
merge main
kevin-montrose Sep 24, 2025
56b5639
checking attribute validation logic against Redis (there isn't any...…
kevin-montrose Sep 25, 2025
9573347
sketch out WITHATTRIBS support for VSIM
kevin-montrose Sep 26, 2025
2b6a3d7
fixup VSIM WITHATTRIBS, some locking stuff to be figured out
kevin-montrose Sep 26, 2025
4483c7d
match Redis error messages
kevin-montrose Sep 26, 2025
90ae2ab
match Redis error messages
kevin-montrose Sep 26, 2025
eab7343
log around replication task failures
kevin-montrose Sep 26, 2025
55641db
test more combinations for replication
kevin-montrose Sep 27, 2025
340f74d
more logging around vector set operation failures
kevin-montrose Sep 27, 2025
11345f4
Merge branch 'main' into vectorApiPoC
kevin-montrose Sep 27, 2025
8f8828d
expand tests to validate more attributes
kevin-montrose Sep 29, 2025
99fab5b
fixup some FFI stuff, including validating that K <= L (ie. COUNT <= EF)
kevin-montrose Sep 29, 2025
9459f81
update diskann-garnet; remove copies when accepting XB8 inputs
kevin-montrose Oct 1, 2025
f7e5e72
remove copies when preparing keys
kevin-montrose Oct 1, 2025
4d91cb7
remove the fiction that we're going to use anything but DiskANN for a…
kevin-montrose Oct 1, 2025
6ce6171
support multiple return vector formats so DiskANN can eliminate anoth…
kevin-montrose Oct 1, 2025
07e5c9a
stopgap commit; sketch out a multi-key read_callback that both prefet…
kevin-montrose Oct 1, 2025
1d49f7e
stopgap commit: add missing file
kevin-montrose Oct 2, 2025
25846f8
stopgap commit; correctly set count
kevin-montrose Oct 6, 2025
551bdbf
stopgap commit; rework for IReadArgBatch
kevin-montrose Oct 6, 2025
efcf0f7
stopgap commit; fixup bugs with IReadArgBatch implementation
kevin-montrose Oct 6, 2025
dfec8c5
stopgap commit; correctly capture callback and callback context on Ve…
kevin-montrose Oct 6, 2025
97c53a9
small refactor to avoid extra accesses and recalcs
kevin-montrose Oct 6, 2025
f0501af
bump diskann-garnet
kevin-montrose Oct 7, 2025
1507f01
rework session tracking so we can set it high up during adds, which l…
kevin-montrose Oct 7, 2025
94e4c39
micro optimization around allocating space for vector set index reads
kevin-montrose Oct 7, 2025
24a0ef4
stopgap commit; start working on recovering indexes from disk / witho…
kevin-montrose Oct 7, 2025
d7a331f
suppress for now
kevin-montrose Oct 8, 2025
9104b92
remove temporary copies and allocations from VADD replication
kevin-montrose Oct 8, 2025
333b4e1
fix replication tests by pausing for VADDs to also catch up
kevin-montrose Oct 8, 2025
83815bb
bump diskann-garnet to 1.0.4
kevin-montrose Oct 9, 2025
ec2569d
1.0.4 has issues, rolling back to 1.0.3
kevin-montrose Oct 9, 2025
add0e44
DRY up index reading to simplify recreation and prepare for shared lo…
kevin-montrose Oct 9, 2025
bda2750
extend locking DRY'ing to replication
kevin-montrose Oct 9, 2025
45073c2
bump to 1.0.5
kevin-montrose Oct 9, 2025
66bdbf3
sketch out sharded read locks
kevin-montrose Oct 9, 2025
dc7718c
bump to 1.0.8
kevin-montrose Oct 9, 2025
6d144ac
rework replication to (probably) fix a bad pointer on passed SpanBytes
kevin-montrose Oct 10, 2025
137d39b
implement (sort of) VEMB for debugging purposes
kevin-montrose Oct 10, 2025
eb393c2
fix merge
kevin-montrose Oct 10, 2025
0aa68d1
stopgap commit; get some stopwatch based logging in for diagnostics
kevin-montrose Oct 11, 2025
e15ffb0
Revert "stopgap commit; get some stopwatch based logging in for diagn…
kevin-montrose Oct 12, 2025
bda5799
less naive prefetch approach, working in batches of 12 and only if we…
kevin-montrose Oct 12, 2025
862f3f1
JIT may not be smart enough to elide these bounds checks, so just go …
kevin-montrose Oct 12, 2025
ff8ec6d
bump diskann and garnet release version
kevin-montrose Oct 13, 2025
559bb9e
fail deadly while upstream Entra fixes are rolling out
kevin-montrose Oct 13, 2025
b8428da
memory corruption bug somewhere - kick up DiskANN in the optimistic h…
kevin-montrose Oct 14, 2025
a43d6a7
change stress amounts
kevin-montrose Oct 14, 2025
73b2122
diskann is hard assuming 75 for now, so change tests accordingly
kevin-montrose Oct 14, 2025
9389ee3
more bounds checking, more logging, let's find this corruption
kevin-montrose Oct 14, 2025
3dfc58a
sketch out VREM
kevin-montrose Oct 15, 2025
1b8a517
DRY up dimension calculation on VADD
kevin-montrose Oct 15, 2025
7856461
don't return success if delete didn't do anything
kevin-montrose Oct 15, 2025
a96efd9
tweak library resolution logic; when hosted as a service on Linux, cu…
kevin-montrose Oct 16, 2025
9fc01e1
bump version
kevin-montrose Oct 16, 2025
2682dc8
be more defensive, though shouldn't really matter; also log more on f…
kevin-montrose Oct 18, 2025
62394a0
Revert "rework replication to (probably) fix a bad pointer on passed …
kevin-montrose Oct 18, 2025
e19f77a
Revert "fix replication tests by pausing for VADDs to also catch up"
kevin-montrose Oct 18, 2025
dbf72b4
Revert "remove temporary copies and allocations from VADD replication"
kevin-montrose Oct 18, 2025
a34ecb6
after reverting replication optimizations, bump version
kevin-montrose Oct 18, 2025
5d159ae
ruled out corruption, remove all these bounds checks and other valida…
kevin-montrose Oct 20, 2025
f811d63
bump diskann-garnet; VREM implemented and VREM replication tested
kevin-montrose Oct 20, 2025
c93a36f
deleting a vector set causes its internal values to be cleanedup (ver…
kevin-montrose Oct 21, 2025
b94494b
fix DEL replays w.r.t. vector sets
kevin-montrose Oct 21, 2025
ac54543
more bits for diskann in context
kevin-montrose Oct 21, 2025
3b1b94c
diskann-garnet to .12, attributes now managed on that side
kevin-montrose Oct 21, 2025
abee408
exclude vector set data from a number of places; get most (all?) test…
kevin-montrose Oct 21, 2025
d7281e3
fixes for recovery, more tests for recovery, diskann-garnet needs som…
kevin-montrose Oct 22, 2025
81a77a5
temp hack around a re-entrancy issue
kevin-montrose Oct 23, 2025
5801363
hack harder
kevin-montrose Oct 23, 2025
b40b4b4
fix recovery test
kevin-montrose Oct 23, 2025
85e7a64
bump to .13
kevin-montrose Oct 23, 2025
f7824ae
bump diskann-garnet to fix bugs
kevin-montrose Oct 24, 2025
37f5b27
restart cleanups upon recovery
kevin-montrose Oct 24, 2025
55fa2cb
start a design doc now that we're mostly nailed down the PoC
kevin-montrose Oct 24, 2025
4a0da48
Remove dead code; we're not using multiinsert right now, and won't fo…
kevin-montrose Oct 24, 2025
133a883
remove more dead code
kevin-montrose Oct 24, 2025
365da1b
finish up first draft of vector-sets.md
kevin-montrose Oct 24, 2025
79a4720
fixup some links
kevin-montrose Oct 24, 2025
b310c5b
naturally, a typo in the first two lines
kevin-montrose Oct 24, 2025
3f192c1
formatting
kevin-montrose Oct 24, 2025
8a46d85
typos
kevin-montrose Oct 24, 2025
bb91ff4
more typos
kevin-montrose Oct 24, 2025
ec74024
note migration is still a WIP
kevin-montrose Oct 26, 2025
a58afad
remove hack from index creation
kevin-montrose Oct 27, 2025
79a4523
remove hack from index recreation
kevin-montrose Oct 27, 2025
54d648a
expand tests
kevin-montrose Oct 27, 2025
49e0aa6
merge main
kevin-montrose Oct 28, 2025
8e09458
fix tests
kevin-montrose Oct 28, 2025
0f1f627
fix tests
kevin-montrose Oct 28, 2025
f7caec2
sketch out rmw callback for DiskANN
kevin-montrose Oct 28, 2025
70e694b
don't roll version back
kevin-montrose Oct 28, 2025
ee5fbe7
fix a bunch of typos
kevin-montrose Oct 29, 2025
a5ffc57
more corrections and cleanup upon review
kevin-montrose Oct 29, 2025
f7e87d0
move VectorManager onto GarnetDatabase, preparing for multi-DB testing
kevin-montrose Oct 29, 2025
3a99f40
mention docs
kevin-montrose Oct 29, 2025
b68dbf5
implement copy-update functions, I seem to have misunderstood the poi…
kevin-montrose Oct 29, 2025
294e1b4
knock our remainder of recreate tests
kevin-montrose Oct 30, 2025
3a5d180
track hash slots with vector set metadata
kevin-montrose Oct 30, 2025
1f0b297
add (failing) basic migration test
kevin-montrose Oct 30, 2025
eb84bd7
stopgap commit; sketch out and document the migration flow
kevin-montrose Oct 30, 2025
74098e5
stopgap commit; primary -> primary for _hash slots_ works; replicas d…
kevin-montrose Oct 31, 2025
f71b4d8
stopgap commit; all Vector Set tests passing, though there's still mi…
kevin-montrose Oct 31, 2025
8f5f8de
fix tests
kevin-montrose Nov 3, 2025
cb6ee17
replicas now follow migrated primary Vector Sets; needs a lot more te…
kevin-montrose Nov 3, 2025
1f11849
migrate ... keys implemented, which wraps up migration (in theory)
kevin-montrose Nov 4, 2025
97478a1
Merge branch 'main' into vectorApiPoC
kevin-montrose Nov 4, 2025
2d9b303
fix tests; all tests passing now
kevin-montrose Nov 4, 2025
c37af2e
test moving multiple vector sets to a primary that already has vector…
kevin-montrose Nov 5, 2025
67b0180
more vector set migration tests, and fixes
kevin-montrose Nov 5, 2025
8408dc6
Rework timeouts for some cluster migration tests
kevin-montrose Nov 5, 2025
120bf69
stopgap commit; lots of hackery to try and make writes during migrati…
kevin-montrose Nov 7, 2025
9f1be80
stopgap commit; this appears to work, need to stress and remove lots …
kevin-montrose Nov 7, 2025
4b08056
stopgap commit; remove a bunch of hackery and logging
kevin-montrose Nov 7, 2025
c277e5c
stress test is still a bit flaky, but there are common non-Vector Set…
kevin-montrose Nov 7, 2025
0a902da
note blocking during migrations in vector-sets.md
kevin-montrose Nov 7, 2025
9a1eeac
restore AAD, this is long since debuged
kevin-montrose Nov 9, 2025
d10a29e
knock a number of hacks out
kevin-montrose Nov 9, 2025
a8f3708
remove another hack
kevin-montrose Nov 9, 2025
51ef75a
hide Vector Sets behind a feature flag - flag defaults on for tests, …
kevin-montrose Nov 9, 2025
f4c1508
dry up exclusive lock acquisition
kevin-montrose Nov 9, 2025
1d62b34
split VectorManager up to make easier to review
kevin-montrose Nov 9, 2025
6704cd5
knock out more todos
kevin-montrose Nov 9, 2025
7173239
cleanup after migration failures
kevin-montrose Nov 9, 2025
e398a85
this TODO is invalid
kevin-montrose Nov 9, 2025
aa6ea9e
don't bump version
kevin-montrose Nov 10, 2025
992fde8
implement ReadWithPrefetch (pulled off of vectorApiPoC work)
kevin-montrose Nov 10, 2025
c78b142
revert change to NativeStorageDevice, not needed as part of Vector Sets
kevin-montrose Nov 10, 2025
aa780f8
formatting
kevin-montrose Nov 10, 2025
a228dc8
actually bump to latest internal, rather than leaving this stashed
kevin-montrose Nov 10, 2025
b75c489
move MGET (normal and scatter-gather) onto ReadWithPrefetch
kevin-montrose Nov 12, 2025
4ab4e6b
address feedback
kevin-montrose Nov 13, 2025
69f8a24
Merge branch 'main' into readWithPrefetch
kevin-montrose Nov 14, 2025
33a8cc3
document that 4-bytes before key for RMW callback is required
kevin-montrose Nov 14, 2025
ca9d14a
move method to migration partial
kevin-montrose Nov 14, 2025
d1d9d6b
correctly update session metrics with new MGET impls
kevin-montrose Nov 14, 2025
9f22e84
Merge branch 'main' into vectorApiPoC
kevin-montrose Nov 17, 2025
c4c4d45
fix merge
kevin-montrose Nov 17, 2025
6ad3dd8
stopgap commit; sketch out alternative locking scheme to replace obje…
kevin-montrose Nov 18, 2025
26f4a86
tweaks to locking impl after some benchmarking
kevin-montrose Nov 19, 2025
c9d2db8
clarify docs, naming, and the 'why' of some optimizations in new lock…
kevin-montrose Nov 19, 2025
505916b
handle feedback; rather than process number, use a thread static whic…
kevin-montrose Nov 20, 2025
f2ee221
formatting
kevin-montrose Nov 20, 2025
e3d1682
Merge branch 'main' into vectorApiPoC
kevin-montrose Nov 20, 2025
9865327
fix merge
kevin-montrose Nov 20, 2025
4cee80c
fix website build
kevin-montrose Nov 20, 2025
a869917
address feedback; generalize vector set locks, move and rename
kevin-montrose Nov 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@
<PackageVersion Include="System.Numerics.Tensors" Version="9.0.9" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="9.0.8" />
<PackageVersion Include="Microsoft.Extensions.Hosting.WindowsServices" Version="9.0.8" />
<PackageVersion Include="diskann-garnet" Version="1.0.15" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo

static ReadOnlySpan<byte> MAIN_STORE => "SSTORE"u8;
static ReadOnlySpan<byte> OBJECT_STORE => "OSTORE"u8;
static ReadOnlySpan<byte> VECTOR_STORE => "VSTORE"u8;
static ReadOnlySpan<byte> T => "T"u8;
static ReadOnlySpan<byte> F => "F"u8;

Expand Down Expand Up @@ -170,14 +171,30 @@ public Task<string> SetSlotRange(Memory<byte> state, string nodeid, List<(int, i
/// <param name="sourceNodeId"></param>
/// <param name="replace"></param>
/// <param name="isMainStore"></param>
public void SetClusterMigrateHeader(string sourceNodeId, bool replace, bool isMainStore)
public void SetClusterMigrateHeader(string sourceNodeId, bool replace, bool isMainStore, bool isVectorSets)
{
currTcsIterationTask = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(currTcsIterationTask);
curr = offset;
this.isMainStore = isMainStore;
this.ist = IncrementalSendType.MIGRATE;
var storeType = isMainStore ? MAIN_STORE : OBJECT_STORE;
ReadOnlySpan<byte> storeType;
if (isMainStore)
{
if (isVectorSets)
{
storeType = VECTOR_STORE;
}
else
{
storeType = MAIN_STORE;
}
}
else
{
storeType = OBJECT_STORE;
}

var replaceOption = replace ? T : F;

var arraySize = 6;
Expand Down Expand Up @@ -249,7 +266,7 @@ public void SetClusterMigrateHeader(string sourceNodeId, bool replace, bool isMa
/// <returns></returns>
public Task<string> CompleteMigrate(string sourceNodeId, bool replace, bool isMainStore)
{
SetClusterMigrateHeader(sourceNodeId, replace, isMainStore);
SetClusterMigrateHeader(sourceNodeId, replace, isMainStore, isVectorSets: false);

Debug.Assert(end - curr >= 2);
*curr++ = (byte)'\r';
Expand Down
25 changes: 15 additions & 10 deletions libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,22 +239,27 @@ public string GetInfo()
public static string GetRange(int[] slots)
{
var range = "> ";
var start = slots[0];
var end = slots[0];
for (var i = 1; i < slots.Length + 1; i++)
if (slots.Length >= 1)
{
if (i < slots.Length && slots[i] == end + 1)
end = slots[i];
else

var start = slots[0];
var end = slots[0];
for (var i = 1; i < slots.Length + 1; i++)
{
range += $"{start}-{end} ";
if (i < slots.Length)
{
start = slots[i];
if (i < slots.Length && slots[i] == end + 1)
end = slots[i];
else
{
range += $"{start}-{end} ";
if (i < slots.Length)
{
start = slots[i];
end = slots[i];
}
}
}
}

return range;
}

Expand Down
5 changes: 4 additions & 1 deletion libs/cluster/Server/ClusterManagerSlotState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ namespace Garnet.cluster
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
/* ObjectStoreFunctions */ StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>,
GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>>>;
GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>>,
BasicContext<SpanByte, SpanByte, VectorInput, SpanByte, long, VectorSessionFunctions,
/* VectorStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>>;

/// <summary>
/// Cluster manager
Expand Down
15 changes: 12 additions & 3 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@

namespace Garnet.cluster
{
using BasicContext = BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>;

using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
/* ObjectStoreFunctions */ StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>,
GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>>>;
GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>>,
BasicContext<SpanByte, SpanByte, VectorInput, SpanByte, long, VectorSessionFunctions,
/* VectorStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>>;

using VectorContext = BasicContext<SpanByte, SpanByte, VectorInput, SpanByte, long, VectorSessionFunctions, StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>, SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>;

/// <summary>
/// Cluster provider
Expand Down Expand Up @@ -100,8 +109,8 @@ public void Start()
}

/// <inheritdoc />
public IClusterSession CreateClusterSession(TransactionManager txnManager, IGarnetAuthenticator authenticator, UserHandle userHandle, GarnetSessionMetrics garnetSessionMetrics, BasicGarnetApi basicGarnetApi, INetworkSender networkSender, ILogger logger = null)
=> new ClusterSession(this, txnManager, authenticator, userHandle, garnetSessionMetrics, basicGarnetApi, networkSender, logger);
public IClusterSession CreateClusterSession(TransactionManager txnManager, IGarnetAuthenticator authenticator, UserHandle userHandle, GarnetSessionMetrics garnetSessionMetrics, BasicGarnetApi basicGarnetApi, BasicContext basicContext, VectorContext vectorContext, INetworkSender networkSender, ILogger logger = null)
=> new ClusterSession(this, txnManager, authenticator, userHandle, garnetSessionMetrics, basicGarnetApi, basicContext, vectorContext, networkSender, logger);

/// <inheritdoc />
public void UpdateClusterAuth(string clusterUsername, string clusterPassword)
Expand Down
118 changes: 112 additions & 6 deletions libs/cluster/Server/Migration/MigrateOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
// Licensed under the MIT license.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Garnet.client;
using Garnet.server;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Garnet.cluster
Expand All @@ -18,16 +20,28 @@ internal sealed partial class MigrateOperation
public MainStoreScan mss;
public ObjectStoreScan oss;

public readonly ConcurrentDictionary<byte[], byte[]> vectorSetsIndexKeysToMigrate;
#if NET9_0_OR_GREATER
private readonly ConcurrentDictionary<byte[], byte[]>.AlternateLookup<ReadOnlySpan<byte>> vectorSetsIndexKeysToMigrateLookup;
#endif

readonly MigrateSession session;
readonly GarnetClientSession gcs;
readonly LocalServerSession localServerSession;

public GarnetClientSession Client => gcs;

public IEnumerable<KeyValuePair<byte[], byte[]>> VectorSets => vectorSetsIndexKeysToMigrate;

public void ThrowIfCancelled() => session._cts.Token.ThrowIfCancellationRequested();

public bool Contains(int slot) => session._sslots.Contains(slot);

public bool ContainsNamespace(ulong ns) => session._namespaces?.Contains(ns) ?? false;

public void EncounteredVectorSet(byte[] key, byte[] value)
=> vectorSetsIndexKeysToMigrate.TryAdd(key, value);

public MigrateOperation(MigrateSession session, Sketch sketch = null, int batchSize = 1 << 18)
{
this.session = session;
Expand All @@ -37,6 +51,10 @@ public MigrateOperation(MigrateSession session, Sketch sketch = null, int batchS
mss = new MainStoreScan(this);
oss = new ObjectStoreScan(this);
keysToDelete = [];
vectorSetsIndexKeysToMigrate = new(ByteArrayComparer.Instance);
#if NET9_0_OR_GREATER
vectorSetsIndexKeysToMigrateLookup = vectorSetsIndexKeysToMigrate.GetAlternateLookup<ReadOnlySpan<byte>>();
#endif
}

public bool Initialize()
Expand Down Expand Up @@ -72,7 +90,7 @@ public void Scan(StoreType storeType, ref long currentAddress, long endAddress)
/// </summary>
/// <param name="storeType"></param>
/// <returns></returns>
public bool TrasmitSlots(StoreType storeType)
public bool TransmitSlots(StoreType storeType)
{
var bufferSize = 1 << 10;
SectorAlignedMemory buffer = new(bufferSize, 1);
Expand All @@ -87,7 +105,7 @@ public bool TrasmitSlots(StoreType storeType)
{
foreach (var key in sketch.argSliceVector)
{
var spanByte = key.SpanByte;
var spanByte = key;
if (!session.WriteOrSendMainStoreKeyValuePair(gcs, localServerSession, ref spanByte, ref input, ref o, out _))
return false;

Expand Down Expand Up @@ -117,7 +135,10 @@ public bool TrasmitSlots(StoreType storeType)
return true;
}

public bool TransmitKeys(StoreType storeType)
/// <summary>
/// Move keys in sketch out of the given store, UNLESS they are also in <paramref name="vectorSetKeysToIgnore"/>.
/// </summary>
public bool TransmitKeys(StoreType storeType, Dictionary<byte[], byte[]> vectorSetKeysToIgnore)
{
var bufferSize = 1 << 10;
SectorAlignedMemory buffer = new(bufferSize, 1);
Expand All @@ -131,12 +152,30 @@ public bool TransmitKeys(StoreType storeType)
var keys = sketch.Keys;
if (storeType == StoreType.Main)
{
#if NET9_0_OR_GREATER
var ignoreLookup = vectorSetKeysToIgnore.GetAlternateLookup<ReadOnlySpan<byte>>();
#endif

for (var i = 0; i < keys.Count; i++)
{
if (keys[i].Item2)
continue;

var spanByte = keys[i].Item1.SpanByte;

// Don't transmit if a Vector Set
var isVectorSet =
vectorSetKeysToIgnore.Count > 0 &&
#if NET9_0_OR_GREATER
ignoreLookup.ContainsKey(spanByte.AsReadOnlySpan());
#else
vectorSetKeysToIgnore.ContainsKey(spanByte.ToByteArray());
#endif
if (isVectorSet)
{
continue;
}

if (!session.WriteOrSendMainStoreKeyValuePair(gcs, localServerSession, ref spanByte, ref input, ref o, out var status))
return false;

Expand All @@ -158,8 +197,8 @@ public bool TransmitKeys(StoreType storeType)
if (keys[i].Item2)
continue;

var argSlice = keys[i].Item1;
if (!session.WriteOrSendObjectStoreKeyValuePair(gcs, localServerSession, ref argSlice, out var status))
var spanByte = keys[i].Item1.SpanByte;
if (!session.WriteOrSendObjectStoreKeyValuePair(gcs, localServerSession, ref spanByte, out var status))
return false;

// Skip if key NOTFOUND
Expand All @@ -182,6 +221,54 @@ public bool TransmitKeys(StoreType storeType)
return true;
}

/// <summary>
/// Transmit data in namespaces during a MIGRATE ... KEYS operation.
///
/// Doesn't delete anything, just scans and transmits.
/// </summary>
public bool TransmitKeysNamespaces(ILogger logger)
{
var migrateOperation = this;

if (!migrateOperation.Initialize())
return false;

var workerStartAddress = migrateOperation.session.clusterProvider.storeWrapper.store.Log.BeginAddress;
var workerEndAddress = migrateOperation.session.clusterProvider.storeWrapper.store.Log.TailAddress;

var cursor = workerStartAddress;
logger?.LogWarning("<MainStore> migrate keys (namespaces) scan range [{workerStartAddress}, {workerEndAddress}]", workerStartAddress, workerEndAddress);
while (true)
{
var current = cursor;
// Build Sketch
migrateOperation.sketch.SetStatus(SketchStatus.INITIALIZING);
migrateOperation.Scan(StoreType.Main, ref current, workerEndAddress);

// Stop if no keys have been found
if (migrateOperation.sketch.argSliceVector.IsEmpty) break;

logger?.LogWarning("Scan from {cursor} to {current} and discovered {count} keys", cursor, current, migrateOperation.sketch.argSliceVector.Count);

// Transition EPSM to MIGRATING
migrateOperation.sketch.SetStatus(SketchStatus.TRANSMITTING);
migrateOperation.session.WaitForConfigPropagation();

// Transmit all keys gathered
migrateOperation.TransmitSlots(StoreType.Main);

// Transition EPSM to DELETING
migrateOperation.sketch.SetStatus(SketchStatus.DELETING);
migrateOperation.session.WaitForConfigPropagation();

// Clear keys from buffer
migrateOperation.sketch.Clear();
cursor = current;
}

return true;
}

/// <summary>
/// Delete keys after migration if copyOption is not set
/// </summary>
Expand All @@ -193,7 +280,13 @@ public void DeleteKeys()
{
foreach (var key in sketch.argSliceVector)
{
var spanByte = key.SpanByte;
if (key.MetadataSize == 1)
{
// Namespace'd keys are not deleted here, but when migration finishes
continue;
}

var spanByte = key;
_ = localServerSession.BasicGarnetApi.DELETE(ref spanByte);
}
}
Expand All @@ -209,6 +302,19 @@ public void DeleteKeys()
}
}
}

/// <summary>
/// Delete a Vector Set after migration if _copyOption is not set.
/// </summary>
public void DeleteVectorSet(ref SpanByte key)
{
if (session._copyOption)
return;

var delRes = localServerSession.BasicGarnetApi.DELETE(ref key);

session.logger?.LogDebug("Deleting Vector Set {key} after migration: {delRes}", System.Text.Encoding.UTF8.GetString(key.AsReadOnlySpan()), delRes);
}
}
}
}
32 changes: 28 additions & 4 deletions libs/cluster/Server/Migration/MigrateScanFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,34 @@ public unsafe bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMeta
if (ClusterSession.Expired(ref value))
return true;

var s = HashSlotUtils.HashSlot(ref key);
// Check if key belongs to slot that is being migrated and if it can be added to our buffer
if (mss.Contains(s) && !mss.sketch.TryHashAndStore(key.AsSpan()))
return false;
// TODO: Some other way to detect namespaces
if (key.MetadataSize == 1)
{
var ns = key.GetNamespaceInPayload();

if (mss.ContainsNamespace(ns) && !mss.sketch.TryHashAndStore(ns, key.AsSpan()))
return false;
}
else
{
var s = HashSlotUtils.HashSlot(ref key);

// Check if key belongs to slot that is being migrated...
if (mss.Contains(s))
{
if (recordMetadata.RecordInfo.VectorSet)
{
// We can't delete the vector set _yet_ nor can we migrate it,
// we just need to remember it to migrate once the associated namespaces are all moved over
mss.EncounteredVectorSet(key.ToByteArray(), value.ToByteArray());
}
else if (!mss.sketch.TryHashAndStore(key.AsSpan()))
{
// Out of space, end scan for now
return false;
}
}
}

return true;
}
Expand Down
Loading
Loading