Skip to content

Commit 855e5d9

Browse files
committed
Merge remote-tracking branch 'upstream/master' into base64encode
2 parents f473cc8 + c8073eb commit 855e5d9

File tree

7 files changed

+41
-14
lines changed

7 files changed

+41
-14
lines changed

internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package broadcaster
33
import (
44
"context"
55
"sort"
6+
"sync"
67
"time"
78

89
"github.com/cenkalti/backoff/v4"
@@ -20,7 +21,8 @@ func newAckCallbackScheduler(logger *log.MLogger) *ackCallbackScheduler {
2021
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
2122
pending: make(chan *broadcastTask, 16),
2223
triggerChan: make(chan struct{}, 1),
23-
rkLocker: newResourceKeyLocker(newBroadcasterMetrics()),
24+
rkLockerMu: sync.Mutex{},
25+
rkLocker: newResourceKeyLocker(),
2426
tombstoneScheduler: newTombstoneScheduler(logger),
2527
}
2628
s.SetLogger(logger)
@@ -41,6 +43,13 @@ type ackCallbackScheduler struct {
4143
// Meanwhile the timetick order of any vchannel of those two tasks are same with the order of broadcastID,
4244
// so the smaller broadcastID task is always acked before the larger broadcastID task.
4345
// so we can exeucte the tasks by the order of the broadcastID to promise the ack order is same with wal order.
46+
rkLockerMu sync.Mutex // because batch lock operation will be executed on rkLocker,
47+
// so we may encounter following cases:
48+
// 1. task A, B, C are competing with rkLocker, and we want the operation is executed in order of A -> B -> C.
49+
// 2. A is on running, and B, C are waiting for the lock.
50+
// 3. When triggerAckCallback, B is failed to acquire the lock, C is pending to call FastLock.
51+
// 4. Then A is done, the lock is released, C acquires the lock and executes the ack callback, the order is broken as A -> C -> B.
52+
// To avoid the order broken, we need to use a mutex to protect the batch lock operation.
4453
rkLocker *resourceKeyLocker // it is used to lock the resource-key of ack operation.
4554
// it is not same instance with the resourceKeyLocker in the broadcastTaskManager.
4655
// because it is just used to check if the resource-key is locked when acked.
@@ -116,6 +125,9 @@ func (s *ackCallbackScheduler) addBroadcastTask(task *broadcastTask) error {
116125

117126
// triggerAckCallback triggers the ack callback.
118127
func (s *ackCallbackScheduler) triggerAckCallback() {
128+
s.rkLockerMu.Lock()
129+
defer s.rkLockerMu.Unlock()
130+
119131
pendingTasks := make([]*broadcastTask, 0, len(s.pendingAckedTasks))
120132
for _, task := range s.pendingAckedTasks {
121133
g, err := s.rkLocker.FastLock(task.Header().ResourceKeys.Collect()...)
@@ -134,7 +146,10 @@ func (s *ackCallbackScheduler) triggerAckCallback() {
134146
func (s *ackCallbackScheduler) doAckCallback(bt *broadcastTask, g *lockGuards) (err error) {
135147
logger := s.Logger().With(zap.Uint64("broadcastID", bt.Header().BroadcastID))
136148
defer func() {
149+
s.rkLockerMu.Lock()
137150
g.Unlock()
151+
s.rkLockerMu.Unlock()
152+
138153
s.triggerChan <- struct{}{}
139154
if err == nil {
140155
logger.Info("execute ack callback done")

internal/streamingcoord/server/broadcaster/broadcast_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func RecoverBroadcaster(ctx context.Context) (Broadcaster, error) {
3535
func newBroadcastTaskManager(protos []*streamingpb.BroadcastTask) *broadcastTaskManager {
3636
logger := resource.Resource().Logger().With(log.FieldComponent("broadcaster"))
3737
metrics := newBroadcasterMetrics()
38-
rkLocker := newResourceKeyLocker(metrics)
38+
rkLocker := newResourceKeyLocker()
3939
ackScheduler := newAckCallbackScheduler(logger)
4040

4141
recoveryTasks := make([]*broadcastTask, 0, len(protos))

internal/streamingcoord/server/broadcaster/broadcast_task.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask, metrics *broadc
2929
ackCallbackScheduler: ackCallbackScheduler,
3030
done: make(chan struct{}),
3131
allAcked: make(chan struct{}),
32+
allAckedClosed: false,
3233
}
3334
if isAllDone(bt.task) {
34-
close(bt.allAcked)
35+
bt.closeAllAcked()
3536
}
3637
if proto.State == streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_TOMBSTONE {
3738
close(bt.done)
@@ -58,6 +59,7 @@ func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage, m
5859
ackCallbackScheduler: ackCallbackScheduler,
5960
done: make(chan struct{}),
6061
allAcked: make(chan struct{}),
62+
allAckedClosed: false,
6163
}
6264
return bt
6365
}
@@ -83,6 +85,7 @@ type broadcastTask struct {
8385
dirty bool // a flag to indicate that the task has been modified and needs to be saved into the recovery info.
8486
done chan struct{}
8587
allAcked chan struct{}
88+
allAckedClosed bool
8689
guards *lockGuards
8790
ackCallbackScheduler *ackCallbackScheduler
8891
joinAckCallbackScheduled bool // a flag to indicate that the join ack callback is scheduled.
@@ -248,11 +251,20 @@ func (b *broadcastTask) ack(ctx context.Context, msgs ...message.ImmutableMessag
248251
b.joinAckCallbackScheduled = true
249252
}
250253
if allDone {
251-
close(b.allAcked)
254+
b.closeAllAcked()
252255
}
253256
return nil
254257
}
255258

259+
// closeAllAcked closes the allAcked channel.
260+
func (b *broadcastTask) closeAllAcked() {
261+
if b.allAckedClosed {
262+
return
263+
}
264+
close(b.allAcked)
265+
b.allAckedClosed = true
266+
}
267+
256268
// hasControlChannel checks if the control channel is broadcasted.
257269
// for the operation since 2.6.5, the control channel is always broadcasted.
258270
// so it's just a dummy function for compatibility.

internal/streamingcoord/server/broadcaster/resource_key_locker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
var errFastLockFailed = errors.New("fast lock failed")
1616

1717
// newResourceKeyLocker creates a new resource key locker.
18-
func newResourceKeyLocker(metrics *broadcasterMetrics) *resourceKeyLocker {
18+
func newResourceKeyLocker() *resourceKeyLocker {
1919
return &resourceKeyLocker{
2020
inner: lock.NewKeyLock[resourceLockKey](),
2121
}

internal/streamingcoord/server/broadcaster/resource_key_locker_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313

1414
func TestResourceKeyLocker(t *testing.T) {
1515
t.Run("concurrent lock/unlock", func(t *testing.T) {
16-
locker := newResourceKeyLocker(newBroadcasterMetrics())
16+
locker := newResourceKeyLocker()
1717
const numGoroutines = 10
1818
const numKeys = 5
1919
const numIterations = 100
@@ -70,7 +70,7 @@ func TestResourceKeyLocker(t *testing.T) {
7070
})
7171

7272
t.Run("deadlock prevention", func(t *testing.T) {
73-
locker := newResourceKeyLocker(newBroadcasterMetrics())
73+
locker := newResourceKeyLocker()
7474
key1 := message.NewCollectionNameResourceKey("test_collection_1")
7575
key2 := message.NewCollectionNameResourceKey("test_collection_2")
7676

@@ -108,7 +108,7 @@ func TestResourceKeyLocker(t *testing.T) {
108108
})
109109

110110
t.Run("fast lock", func(t *testing.T) {
111-
locker := newResourceKeyLocker(newBroadcasterMetrics())
111+
locker := newResourceKeyLocker()
112112
key := message.NewCollectionNameResourceKey("test_collection")
113113

114114
// First fast lock should succeed

pkg/util/paramtable/component_param.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6378,21 +6378,21 @@ too few tombstones may lead to ABA issues in the state of milvus cluster.`,
63786378
p.WALBroadcasterTombstoneMaxCount = ParamItem{
63796379
Key: "streaming.walBroadcaster.tombstone.maxCount",
63806380
Version: "2.6.0",
6381-
Doc: `The max count of tombstone, 256 by default.
6381+
Doc: `The max count of tombstone, 8192 by default.
63826382
Tombstone is used to reject duplicate submissions of DDL messages,
63836383
too few tombstones may lead to ABA issues in the state of milvus cluster.`,
6384-
DefaultValue: "256",
6384+
DefaultValue: "8192",
63856385
Export: false,
63866386
}
63876387
p.WALBroadcasterTombstoneMaxCount.Init(base.mgr)
63886388

63896389
p.WALBroadcasterTombstoneMaxLifetime = ParamItem{
63906390
Key: "streaming.walBroadcaster.tombstone.maxLifetime",
63916391
Version: "2.6.0",
6392-
Doc: `The max lifetime of tombstone, 30m by default.
6392+
Doc: `The max lifetime of tombstone, 24h by default.
63936393
Tombstone is used to reject duplicate submissions of DDL messages,
63946394
too few tombstones may lead to ABA issues in the state of milvus cluster.`,
6395-
DefaultValue: "30m",
6395+
DefaultValue: "24h",
63966396
Export: false,
63976397
}
63986398
p.WALBroadcasterTombstoneMaxLifetime.Init(base.mgr)

pkg/util/paramtable/component_param_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -669,8 +669,8 @@ func TestComponentParam(t *testing.T) {
669669
assert.Equal(t, 30*time.Second, params.StreamingCfg.WALBalancerOperationTimeout.GetAsDurationByParse())
670670
assert.Equal(t, 4.0, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat())
671671
assert.Equal(t, 5*time.Minute, params.StreamingCfg.WALBroadcasterTombstoneCheckInternal.GetAsDurationByParse())
672-
assert.Equal(t, 256, params.StreamingCfg.WALBroadcasterTombstoneMaxCount.GetAsInt())
673-
assert.Equal(t, 30*time.Minute, params.StreamingCfg.WALBroadcasterTombstoneMaxLifetime.GetAsDurationByParse())
672+
assert.Equal(t, 8192, params.StreamingCfg.WALBroadcasterTombstoneMaxCount.GetAsInt())
673+
assert.Equal(t, 24*time.Hour, params.StreamingCfg.WALBroadcasterTombstoneMaxLifetime.GetAsDurationByParse())
674674
assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
675675
assert.Equal(t, 30*time.Second, params.StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse())
676676
assert.Equal(t, int64(64*1024*1024), params.StreamingCfg.WALWriteAheadBufferCapacity.GetAsSize())

0 commit comments

Comments
 (0)