Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ type ChannelManagerImpl struct {
legacyNodes typeutil.UniqueSet

lastActiveTimestamp time.Time

// Idempotency and restart support
startupMu sync.Mutex // Protects Startup/Close operations
started bool
}

// ChannelBGChecker are goroutining running background
Expand Down Expand Up @@ -130,7 +134,19 @@ func NewChannelManager(
}

func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes []int64) error {
ctx, m.cancel = context.WithCancel(ctx)
m.startupMu.Lock()
defer m.startupMu.Unlock()

if m.started {
// Already started, need to close first then restart
m.doClose()
}

return m.doStartup(ctx, legacyNodes, allNodes)
}

func (m *ChannelManagerImpl) doStartup(ctx context.Context, legacyNodes, allNodes []int64) error {
ctx, cancel := context.WithCancel(ctx)

m.legacyNodes = typeutil.NewUniqueSet(legacyNodes...)

Expand Down Expand Up @@ -165,6 +181,10 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes
}
m.mu.Unlock()

// All operations succeeded, now set the state
m.cancel = cancel
m.started = true

if m.balanceCheckLoop != nil {
log.Ctx(ctx).Info("starting channel balance loop")
m.wg.Add(1)
Expand All @@ -184,10 +204,24 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes
}

func (m *ChannelManagerImpl) Close() {
m.startupMu.Lock()
defer m.startupMu.Unlock()
m.doClose()
}

// doClose is the internal implementation of Close without acquiring startupMu.
// It should only be called when startupMu is already held.
func (m *ChannelManagerImpl) doClose() {
if !m.started {
return
}

if m.cancel != nil {
m.cancel()
m.wg.Wait()
}

m.started = false
}

func (m *ChannelManagerImpl) AddNode(nodeID UniqueID) error {
Expand Down
137 changes: 137 additions & 0 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,3 +904,140 @@ func (s *ChannelManagerSuite) TestGetChannelWatchInfos() {
infos = cm.GetChannelWatchInfos()
s.Equal(0, len(infos))
}

func (s *ChannelManagerSuite) TestStartupIdempotency() {
s.Run("repeated Startup calls should be idempotent", func() {
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false).Maybe()

m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc, withCheckerV2())
s.Require().NoError(err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var (
legacyNodes = []int64{1}
allNodes = []int64{1, 2}
)

// First Startup
err = m.Startup(ctx, legacyNodes, allNodes)
s.NoError(err)
s.True(m.started)
s.checkAssignment(m, 1, "ch1", Legacy)
s.checkAssignment(m, 1, "ch2", Legacy)

// Wait a bit for goroutine to start
// Second Startup - should close and restart
err = m.Startup(ctx, legacyNodes, allNodes)
s.NoError(err)
s.True(m.started)
s.checkAssignment(m, 1, "ch1", Legacy)
s.checkAssignment(m, 1, "ch2", Legacy)

// Third Startup - should still work
err = m.Startup(ctx, legacyNodes, allNodes)
s.NoError(err)
s.True(m.started)
})
}

func (s *ChannelManagerSuite) TestStartupAfterClose() {
s.Run("Startup after Close should restart successfully", func() {
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false).Maybe()

m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc, withCheckerV2())
s.Require().NoError(err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var (
legacyNodes = []int64{1}
allNodes = []int64{1}
)

// First Startup
err = m.Startup(ctx, legacyNodes, allNodes)
s.NoError(err)
s.True(m.started)
s.checkAssignment(m, 1, "ch1", Legacy)
s.checkAssignment(m, 1, "ch2", Legacy)

// Close
m.Close()
s.False(m.started)

// Startup again after Close
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()

err = m.Startup(ctx2, legacyNodes, allNodes)
s.NoError(err)
s.True(m.started)
s.checkAssignment(m, 1, "ch1", Legacy)
s.checkAssignment(m, 1, "ch2", Legacy)

// Close again
m.Close()
s.False(m.started)
})
}

func (s *ChannelManagerSuite) TestCloseIdempotency() {
s.Run("multiple Close calls should be idempotent", func() {
chNodes := map[string]int64{
"ch1": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false).Maybe()

m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc, withCheckerV2())
s.Require().NoError(err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Startup first
err = m.Startup(ctx, []int64{1}, []int64{1})
s.NoError(err)
s.True(m.started)

// First Close
m.Close()
s.False(m.started)

// Second Close - should be safe
m.Close()
s.False(m.started)

// Third Close - should still be safe
m.Close()
s.False(m.started)
})

s.Run("Close without Startup should be safe", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

s.False(m.started)

// Close without Startup should not panic
s.NotPanics(func() {
m.Close()
})

s.False(m.started)
})
}
5 changes: 0 additions & 5 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,11 +649,6 @@ func (s *Server) rewatchDataNodes(sessions map[string]*sessionutil.Session) erro
datanodes = append(datanodes, info)
}

// if err := s.nodeManager.Startup(s.ctx, datanodes); err != nil {
// log.Warn("DataCoord failed to add datanode", zap.Error(err))
// return err
// }

log.Info("DataCoord Cluster Manager start up")
if err := s.cluster.Startup(s.ctx, datanodes); err != nil {
log.Warn("DataCoord Cluster Manager failed to start up", zap.Error(err))
Expand Down
Loading