diff --git a/cmd/hubagent/workload/setup.go b/cmd/hubagent/workload/setup.go index cbaee55d5..d592760e3 100644 --- a/cmd/hubagent/workload/setup.go +++ b/cmd/hubagent/workload/setup.go @@ -357,7 +357,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, defaultProfile := profile.NewDefaultProfile() defaultFramework := framework.NewFramework(defaultProfile, mgr) defaultSchedulingQueue := queue.NewSimplePlacementSchedulingQueue( - queue.WithName(schedulerQueueName), + schedulerQueueName, nil, ) // we use one scheduler for every 10 concurrent placement defaultScheduler := scheduler.NewScheduler("DefaultScheduler", defaultFramework, defaultSchedulingQueue, mgr, diff --git a/pkg/scheduler/queue/batched.go b/pkg/scheduler/queue/batched.go new file mode 100644 index 000000000..003b05a1b --- /dev/null +++ b/pkg/scheduler/queue/batched.go @@ -0,0 +1,233 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "fmt" + "time" + + "k8s.io/client-go/util/workqueue" +) + +const ( + maxNumberOfKeysToMoveFromBatchedToActiveQueuePerGo = 20000 +) + +// batchedProcessingPlacementSchedulingQueue implements the PlacementSchedulingQueue +// interface. +// +// It consists of two work queues to allow processing for both immediate and batched +// processing for scheduling related events (changes) of different responsiveness levels. +type batchedProcessingPlacementSchedulingQueue struct { + active workqueue.TypedRateLimitingInterface[any] + batched workqueue.TypedRateLimitingInterface[any] + + moveNow chan struct{} + movePeriodSeconds int32 +} + +// Verify that batchedProcessingPlacementSchedulingQueue implements +// PlacementSchedulingQueue at compile time. +var _ PlacementSchedulingQueue = &batchedProcessingPlacementSchedulingQueue{} + +// batchedProcessingPlacementSchedulingQueueOptions are the options for the +// batchedProcessingPlacementSchedulingQueue. +type batchedProcessingPlacementSchedulingQueueOptions struct { + activeQueueRateLimiter workqueue.TypedRateLimiter[any] + batchedQueueRateLimiter workqueue.TypedRateLimiter[any] + name string + movePeriodSeconds int32 +} + +var defaultBatchedProcessingPlacementSchedulingQueueOptions = batchedProcessingPlacementSchedulingQueueOptions{ + activeQueueRateLimiter: workqueue.DefaultTypedControllerRateLimiter[any](), + batchedQueueRateLimiter: workqueue.DefaultTypedControllerRateLimiter[any](), + name: "batchedProcessingPlacementSchedulingQueue", + movePeriodSeconds: int32(300), // 5 minutes +} + +// Close shuts down the scheduling queue immediately. +// +// Note that items remaining in the active queue might not get processed any more, and items +// left in the batched queue might not be moved to the active queue any more either. +func (bq *batchedProcessingPlacementSchedulingQueue) Close() { + // Signal the mover goroutine to exit. + // + // Note that this will trigger the mover goroutine to attempt another key move, but the + // active queue might not be able to accept the key any more (which is OK and does not + // result in an error). + close(bq.moveNow) + + bq.batched.ShutDown() + bq.active.ShutDown() +} + +// CloseWithDrain shuts down the scheduling queue and returns until: +// a) all the items in the batched queue have been moved to the active queue; and +// b) all the items in the active queue have been processed. +func (bq *batchedProcessingPlacementSchedulingQueue) CloseWithDrain() { + // Signal that all items in the batched queue should be moved to the active queue right away. + close(bq.moveNow) + + // Wait until all the items in the moving process from the batched queue to the active queue have completed + // their moves. + bq.batched.ShutDownWithDrain() + // Wait until all the items that are currently being processed by the scheduler to finish. + bq.active.ShutDownWithDrain() +} + +// NextPlacementKey returns the next PlacementKey (either clusterResourcePlacementKey or resourcePlacementKey) +// in the work queue for the scheduler to process. +func (bq *batchedProcessingPlacementSchedulingQueue) NextPlacementKey() (key PlacementKey, closed bool) { + // This will block on a condition variable if the queue is empty. + placementKey, shutdown := bq.active.Get() + if shutdown { + return "", true + } + return placementKey.(PlacementKey), false +} + +// Done marks a PlacementKey as done. +func (bq *batchedProcessingPlacementSchedulingQueue) Done(placementKey PlacementKey) { + bq.active.Done(placementKey) + // The keys in the batched queue are marked as done as soon as they are moved to the active queue. +} + +// Add adds a PlacementKey to the work queue for immediate processing. +// +// Note that this bypasses the rate limiter (if any). +func (bq *batchedProcessingPlacementSchedulingQueue) Add(placementKey PlacementKey) { + bq.active.Add(placementKey) +} + +// AddAfter adds a PlacementKey to the work queue after a set duration for immediate processing. +// +// Note that this bypasses the rate limiter (if any). +func (bq *batchedProcessingPlacementSchedulingQueue) AddAfter(placementKey PlacementKey, duration time.Duration) { + bq.active.AddAfter(placementKey, duration) +} + +// AddRateLimited adds a PlacementKey to the work queue after the rate limiter (if any) +// says that it is OK, for immediate processing. +func (bq *batchedProcessingPlacementSchedulingQueue) AddRateLimited(placementKey PlacementKey) { + bq.active.AddRateLimited(placementKey) +} + +// Forget untracks a PlacementKey from rate limiter(s) (if any) set up with the queue. +func (bq *batchedProcessingPlacementSchedulingQueue) Forget(placementKey PlacementKey) { + bq.active.Forget(placementKey) + // The keys in the batched queue are forgotten as soon as they are moved to the active queue. +} + +// AddBatched tracks a PlacementKey and adds such keys in batch later to the work queue when appropriate. +func (bq *batchedProcessingPlacementSchedulingQueue) AddBatched(placementKey PlacementKey) { + bq.batched.Add(placementKey) +} + +// Run starts the scheduling queue. +func (bq *batchedProcessingPlacementSchedulingQueue) Run() { + // Spin up a goroutine to move items periodically from the batched queue to the active queue. + go func() { + timer := time.NewTimer(time.Duration(bq.movePeriodSeconds) * time.Second) + for { + select { + case _, closed := <-bq.moveNow: + if closed && bq.batched.ShuttingDown() { + // The batched queue has been shut down, and the moveNow channel has been closed; + // now it is safe to assume that after moving all the items from the batched queue to the active queue + // this time, the batched queue will be drained. + bq.moveAllBatchedItemsToActiveQueue() + return + } + + // The batched queue might still be running; move all items and re-enter the loop. + bq.moveAllBatchedItemsToActiveQueue() + case <-timer.C: + // The timer has fired; move all items. + bq.moveAllBatchedItemsToActiveQueue() + } + + // Reset the timer for the next round. + timer.Reset(time.Duration(bq.movePeriodSeconds) * time.Second) + } + }() +} + +func (bq *batchedProcessingPlacementSchedulingQueue) moveAllBatchedItemsToActiveQueue() { + keysToMove := []PlacementKey{} + + for bq.batched.Len() > 0 { + // Note that the batched queue is an internal object and is only read here by the scheduling queue + // itself (i.e., the batched queue has only one reader, though there might be multiple writers); + // consequently, if the Len() > 0 check passes, the subsequent Get() call is guaranteed to return + // an item (i.e., the call will not block). For simplicity reasons we do not do additional + // sanity checks here. + placementKey, shutdown := bq.batched.Get() + if shutdown { + break + } + keysToMove = append(keysToMove, placementKey.(PlacementKey)) + + if len(keysToMove) > maxNumberOfKeysToMoveFromBatchedToActiveQueuePerGo { + // The keys popped from the batched queue are not yet added to the active queue, in other words, + // they are not yet marked as done; the batched queue will still track them and adding them + // to the batched queue again at this moment will not trigger the batched queue to yield the same + // keys again. This implies that the at maximum we will be moving a number of keys equal to + // the number of placement objects in the system at a time, which should be a finite number. + // Still, to be on the safer side here KubeFleet sets a cap the number of keys to move per go. + break + } + } + + for _, key := range keysToMove { + // Mark the keys as done in the batched queue and add the keys to the active queue in batch. Here the + // implementation keeps the keys in memory first and does not move keys right after they are popped as + // this pattern risks synchronized processing (i.e., a key is popped from the batched queue, immeidiately added to the + // active queue and gets marked as done by the scheduler, then added back to the batched queue again by + // one of the watchers before the key moving attempt is finished, which results in perpetual key moving). + bq.active.Add(key) + bq.batched.Done(key) + bq.batched.Forget(key) + } +} + +// NewBatchedProcessingPlacementSchedulingQueue returns a batchedProcessingPlacementSchedulingQueue. +func NewBatchedProcessingPlacementSchedulingQueue(name string, activeQRateLimiter, batchedQRateLimiter workqueue.TypedRateLimiter[any], movePeriodSeconds int32) PlacementSchedulingQueue { + if len(name) == 0 { + name = defaultBatchedProcessingPlacementSchedulingQueueOptions.name + } + if activeQRateLimiter == nil { + activeQRateLimiter = defaultBatchedProcessingPlacementSchedulingQueueOptions.activeQueueRateLimiter + } + if batchedQRateLimiter == nil { + batchedQRateLimiter = defaultBatchedProcessingPlacementSchedulingQueueOptions.batchedQueueRateLimiter + } + if movePeriodSeconds <= 0 { + movePeriodSeconds = defaultBatchedProcessingPlacementSchedulingQueueOptions.movePeriodSeconds + } + + return &batchedProcessingPlacementSchedulingQueue{ + active: workqueue.NewTypedRateLimitingQueueWithConfig(activeQRateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{ + Name: fmt.Sprintf("%s_Active", name), + }), + batched: workqueue.NewTypedRateLimitingQueueWithConfig(batchedQRateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{ + Name: fmt.Sprintf("%s_Batched", name), + }), + moveNow: make(chan struct{}), + movePeriodSeconds: movePeriodSeconds, + } +} diff --git a/pkg/scheduler/queue/batched_test.go b/pkg/scheduler/queue/batched_test.go new file mode 100644 index 000000000..93056e8d0 --- /dev/null +++ b/pkg/scheduler/queue/batched_test.go @@ -0,0 +1,187 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" +) + +// TestBatchedProcessingPlacementSchedulingQueue_BasicOps tests the basic ops +// (Add, Next, Done) of a batchedProcessingPlacementSchedulingQueue. +func TestBatchedProcessingPlacementSchedulingQueue_BasicOps(t *testing.T) { + bq := NewBatchedProcessingPlacementSchedulingQueue("TestOnly", nil, nil, 0) + bq.Run() + + keysToAdd := []PlacementKey{"A", "B", "C", "D", "E"} + for _, key := range keysToAdd { + bq.Add(key) + } + + keysRecved := []PlacementKey{} + for i := 0; i < len(keysToAdd); i++ { + key, closed := bq.NextPlacementKey() + if closed { + t.Fatalf("Queue closed unexpected") + } + keysRecved = append(keysRecved, key) + bq.Done(key) + bq.Forget(key) + } + + if !cmp.Equal(keysToAdd, keysRecved) { + t.Fatalf("Received keys %v, want %v", keysRecved, keysToAdd) + } + + bq.Close() +} + +// TestBatchedProcessingPlacementSchedulingQueue_BatchedOps tests the batched ops +// (AddBatched) of a batchedProcessingPlacementSchedulingQueue. +func TestBatchedProcessingPlacementSchedulingQueue_BatchedOps(t *testing.T) { + movePeriodSeconds := int32(5) // 5 seconds + bq := NewBatchedProcessingPlacementSchedulingQueue("TestOnly", nil, nil, movePeriodSeconds) + bq.Run() + + addedTimestamp := time.Now() + keysToAddBatched := []PlacementKey{"A", "B", "C"} + for _, key := range keysToAddBatched { + bq.AddBatched(key) + } + + keysRecved := []PlacementKey{} + for i := 0; i < len(keysToAddBatched); i++ { + key, closed := bq.NextPlacementKey() + if closed { + t.Fatalf("Queue closed unexpected") + } + keysRecved = append(keysRecved, key) + bq.Done(key) + bq.Forget(key) + } + + if !cmp.Equal(keysToAddBatched, keysRecved) { + t.Fatalf("Received keys %v, want %v", keysRecved, keysToAddBatched) + } + // Allow some buffer time (+1 second). + if timeSpent := time.Since(addedTimestamp); timeSpent < time.Duration(movePeriodSeconds-1)*time.Second { + t.Fatalf("time to move keys, want no less than %f seconds, got %f seconds", float64(movePeriodSeconds-1), timeSpent.Seconds()) + } +} + +// TestBatchedProcessingPlacementSchedulingQueue_MoveNow tests the moveNow signal +// built in a batchedProcessingPlacementSchedulingQueue. +func TestBatchedProcessingPlacementSchedulingQueue_MoveNow(t *testing.T) { + movePeriodSeconds := int32(10) // 10 seconds + bq := NewBatchedProcessingPlacementSchedulingQueue("TestOnly", nil, nil, movePeriodSeconds) + bq.Run() + + keysToAddBatched := []PlacementKey{"A", "B", "C"} + for _, key := range keysToAddBatched { + bq.AddBatched(key) + } + + // Send a move now signal. + bqStruct, ok := bq.(*batchedProcessingPlacementSchedulingQueue) + if !ok { + t.Fatalf("Failed to cast to batchedProcessingPlacementSchedulingQueue") + } + bqStruct.moveNow <- struct{}{} + + moveNowTriggeredTimestamp := time.Now() + keysRecved := []PlacementKey{} + for i := 0; i < len(keysToAddBatched); i++ { + key, closed := bq.NextPlacementKey() + if closed { + t.Fatalf("Queue closed unexpected") + } + keysRecved = append(keysRecved, key) + bq.Done(key) + bq.Forget(key) + } + + if !cmp.Equal(keysToAddBatched, keysRecved) { + t.Fatalf("Received keys %v, want %v", keysRecved, keysToAddBatched) + } + // Allow some buffer time (1 seconds). + if timeSpent := time.Since(moveNowTriggeredTimestamp); timeSpent > time.Second { + t.Fatalf("time to move keys after move now triggered, want no more than %f seconds, got %f seconds", 1.0, timeSpent.Seconds()) + } +} + +// TestBatchedProcessingPlacementSchedulingQueue_CloseWithDrain tests the CloseWithDrain +// method of a batchedProcessingPlacementSchedulingQueue. +func TestBatchedProcessingPlacementSchedulingQueue_CloseWithDrain(t *testing.T) { + movePeriodSeconds := int32(600) // 10 minutes + bq := NewBatchedProcessingPlacementSchedulingQueue("TestOnly", nil, nil, movePeriodSeconds) + bq.Run() + + keysToAdd := []PlacementKey{"A", "B", "C"} + for _, key := range keysToAdd { + bq.Add(key) + } + + keysToAddBatched := []PlacementKey{"D", "E", "F"} + for _, key := range keysToAddBatched { + bq.AddBatched(key) + } + + // Send a move now signal. + bqStruct, ok := bq.(*batchedProcessingPlacementSchedulingQueue) + if !ok { + t.Fatalf("Failed to cast to batchedProcessingPlacementSchedulingQueue") + } + bqStruct.moveNow <- struct{}{} + + keysRecved := []PlacementKey{} + for i := 0; i < len(keysToAdd)+len(keysToAddBatched); i++ { + key, closed := bq.NextPlacementKey() + if closed { + t.Fatalf("Queue closed unexpected") + } + keysRecved = append(keysRecved, key) + // Do not yet mark the keys as Done. + } + + timerPeriodSeconds := int32(5) + go func() { + timer := time.NewTimer(time.Duration(timerPeriodSeconds) * time.Second) + <-timer.C + // Mark all keys as Done after 5 seconds. + for _, key := range keysRecved { + bq.Done(key) + bq.Forget(key) + } + }() + + // Close and drain the queue; this should block until all keys are marked Done. + closeWithDrainTimestamp := time.Now() + bq.CloseWithDrain() + + wantKeys := make([]PlacementKey, 0, len(keysToAdd)+len(keysToAddBatched)) + wantKeys = append(wantKeys, keysToAdd...) + wantKeys = append(wantKeys, keysToAddBatched...) + if !cmp.Equal(wantKeys, keysRecved) { + t.Fatalf("Received keys %v, want %v", keysRecved, wantKeys) + } + // Allow some buffer time (+1 second). + if timeSpent := time.Since(closeWithDrainTimestamp); timeSpent > time.Duration(timerPeriodSeconds+1)*time.Second { + t.Fatalf("time to close with drain, want no more than %f seconds, got %f seconds", float64(timerPeriodSeconds+1), timeSpent.Seconds()) + } +} diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 552fd64fc..82ecdbf9d 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -20,8 +20,6 @@ package queue import ( "time" - - "k8s.io/client-go/util/workqueue" ) // PlacementKey is the unique identifier for a Placement checked into a scheduling queue. @@ -44,6 +42,10 @@ type PlacementSchedulingQueueWriter interface { AddRateLimited(placementKey PlacementKey) // AddAfter adds a PlacementKey to the work queue after a set duration. AddAfter(placementKey PlacementKey, duration time.Duration) + // AddBatched tracks a PlacementKey and adds such keys in batch later to the work queue when appropriate. + // + // This is most helpful in cases where certain changes do not require immediate processing by the scheduler. + AddBatched(placementKey PlacementKey) } // PlacementSchedulingQueue is an interface which queues PlacementKeys for the scheduler @@ -65,124 +67,3 @@ type PlacementSchedulingQueue interface { // Forget untracks a PlacementKey from rate limiter(s) (if any) set up with the queue. Forget(placementKey PlacementKey) } - -// simplePlacementSchedulingQueue is a simple implementation of -// PlacementSchedulingQueue. -// -// At this moment, one single workqueue would suffice, as sources such as the cluster watcher, -// the binding watcher, etc., can catch all changes that need the scheduler's attention. -// In the future, when more features, e.g., inter-placement affinity/anti-affinity, are added, -// more queues, such as a backoff queue, might become necessary. -type simplePlacementSchedulingQueue struct { - active workqueue.TypedRateLimitingInterface[any] -} - -// Verify that simplePlacementSchedulingQueue implements -// PlacementSchedulingQueue at compile time. -var _ PlacementSchedulingQueue = &simplePlacementSchedulingQueue{} - -// simplePlacementSchedulingQueueOptions are the options for the -// simplePlacementSchedulingQueue. -type simplePlacementSchedulingQueueOptions struct { - rateLimiter workqueue.TypedRateLimiter[any] - name string -} - -// Option is the function that configures the simplePlacementSchedulingQueue. -type Option func(*simplePlacementSchedulingQueueOptions) - -var defaultSimplePlacementSchedulingQueueOptions = simplePlacementSchedulingQueueOptions{ - rateLimiter: workqueue.DefaultTypedControllerRateLimiter[any](), - name: "placementSchedulingQueue", -} - -// WithRateLimiter sets a rate limiter for the workqueue. -func WithRateLimiter(rateLimiter workqueue.TypedRateLimiter[any]) Option { - return func(o *simplePlacementSchedulingQueueOptions) { - o.rateLimiter = rateLimiter - } -} - -// WithName sets a name for the workqueue. -func WithName(name string) Option { - return func(o *simplePlacementSchedulingQueueOptions) { - o.name = name - } -} - -// Run starts the scheduling queue. -// -// At this moment, Run is an no-op as there is only one queue present; in the future, -// when more queues are added, Run would start goroutines that move items between queues as -// appropriate. -func (sq *simplePlacementSchedulingQueue) Run() {} - -// Close shuts down the scheduling queue immediately. -func (sq *simplePlacementSchedulingQueue) Close() { - sq.active.ShutDown() -} - -// CloseWithDrain shuts down the scheduling queue and returns until all items are processed. -func (sq *simplePlacementSchedulingQueue) CloseWithDrain() { - sq.active.ShutDownWithDrain() -} - -// NextPlacementKey returns the next PlacementKey (either clusterResourcePlacementKey or resourcePlacementKey) -// in the work queue for the scheduler to process. -// -// Note that for now the queue simply wraps a work queue, and consider its state (whether it -// is shut down or not) as its own closedness. In the future, when more queues are added, the -// queue implementation must manage its own state. -func (sq *simplePlacementSchedulingQueue) NextPlacementKey() (key PlacementKey, closed bool) { - // This will block on a condition variable if the queue is empty. - placementKey, shutdown := sq.active.Get() - if shutdown { - return "", true - } - return placementKey.(PlacementKey), false -} - -// Done marks a PlacementKey as done. -func (sq *simplePlacementSchedulingQueue) Done(placementKey PlacementKey) { - sq.active.Done(placementKey) -} - -// Add adds a PlacementKey to the work queue. -// -// Note that this bypasses the rate limiter (if any). -func (sq *simplePlacementSchedulingQueue) Add(placementKey PlacementKey) { - sq.active.Add(placementKey) -} - -// AddRateLimited adds a PlacementKey to the work queue after the rate limiter (if any) -// says that it is OK. -func (sq *simplePlacementSchedulingQueue) AddRateLimited(placementKey PlacementKey) { - sq.active.AddRateLimited(placementKey) -} - -// AddAfter adds a PlacementKey to the work queue after a set duration. -// -// Note that this bypasses the rate limiter (if any) -func (sq *simplePlacementSchedulingQueue) AddAfter(placementKey PlacementKey, duration time.Duration) { - sq.active.AddAfter(placementKey, duration) -} - -// Forget untracks a PlacementKey from rate limiter(s) (if any) set up with the queue. -func (sq *simplePlacementSchedulingQueue) Forget(placementKey PlacementKey) { - sq.active.Forget(placementKey) -} - -// NewSimplePlacementSchedulingQueue returns a -// simplePlacementSchedulingQueue. -func NewSimplePlacementSchedulingQueue(opts ...Option) PlacementSchedulingQueue { - options := defaultSimplePlacementSchedulingQueueOptions - for _, opt := range opts { - opt(&options) - } - - return &simplePlacementSchedulingQueue{ - active: workqueue.NewTypedRateLimitingQueueWithConfig(options.rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{ - Name: options.name, - }), - } -} diff --git a/pkg/scheduler/queue/simple.go b/pkg/scheduler/queue/simple.go new file mode 100644 index 000000000..8721bc517 --- /dev/null +++ b/pkg/scheduler/queue/simple.go @@ -0,0 +1,133 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "time" + + "k8s.io/client-go/util/workqueue" +) + +// simplePlacementSchedulingQueue is a simple implementation of +// PlacementSchedulingQueue. +// +// This implementation is essentially a thin wrapper around one rate limiting +// workqueue, which queues all placement keys indiscriminately for processing. +type simplePlacementSchedulingQueue struct { + active workqueue.TypedRateLimitingInterface[any] +} + +// Verify that simplePlacementSchedulingQueue implements +// PlacementSchedulingQueue at compile time. +var _ PlacementSchedulingQueue = &simplePlacementSchedulingQueue{} + +// simplePlacementSchedulingQueueOptions are the options for the +// simplePlacementSchedulingQueue. +type simplePlacementSchedulingQueueOptions struct { + rateLimiter workqueue.TypedRateLimiter[any] + name string +} + +var defaultSimplePlacementSchedulingQueueOptions = simplePlacementSchedulingQueueOptions{ + rateLimiter: workqueue.DefaultTypedControllerRateLimiter[any](), + name: "simplePlacementSchedulingQueue", +} + +// Run starts the scheduling queue. +// +// At this moment, Run is an no-op as there is only one queue present; in the future, +// when more queues are added, Run would start goroutines that move items between queues as +// appropriate. +func (sq *simplePlacementSchedulingQueue) Run() {} + +// Close shuts down the scheduling queue immediately. +func (sq *simplePlacementSchedulingQueue) Close() { + sq.active.ShutDown() +} + +// CloseWithDrain shuts down the scheduling queue and returns until all items are processed. +func (sq *simplePlacementSchedulingQueue) CloseWithDrain() { + sq.active.ShutDownWithDrain() +} + +// NextPlacementKey returns the next PlacementKey (either clusterResourcePlacementKey or resourcePlacementKey) +// in the work queue for the scheduler to process. +// +// Note that for now the queue simply wraps a work queue, and consider its state (whether it +// is shut down or not) as its own closedness. In the future, when more queues are added, the +// queue implementation must manage its own state. +func (sq *simplePlacementSchedulingQueue) NextPlacementKey() (key PlacementKey, closed bool) { + // This will block on a condition variable if the queue is empty. + placementKey, shutdown := sq.active.Get() + if shutdown { + return "", true + } + return placementKey.(PlacementKey), false +} + +// Done marks a PlacementKey as done. +func (sq *simplePlacementSchedulingQueue) Done(placementKey PlacementKey) { + sq.active.Done(placementKey) +} + +// Add adds a PlacementKey to the work queue. +// +// Note that this bypasses the rate limiter (if any). +func (sq *simplePlacementSchedulingQueue) Add(placementKey PlacementKey) { + sq.active.Add(placementKey) +} + +// AddRateLimited adds a PlacementKey to the work queue after the rate limiter (if any) +// says that it is OK. +func (sq *simplePlacementSchedulingQueue) AddRateLimited(placementKey PlacementKey) { + sq.active.AddRateLimited(placementKey) +} + +// AddAfter adds a PlacementKey to the work queue after a set duration. +// +// Note that this bypasses the rate limiter (if any). +func (sq *simplePlacementSchedulingQueue) AddAfter(placementKey PlacementKey, duration time.Duration) { + sq.active.AddAfter(placementKey, duration) +} + +// AddBatched tracks a PlacementKey and adds such keys in batch later to the work queue when appropriate. +// +// For the simple queue implementation, this is equivalent to Add. +func (sq *simplePlacementSchedulingQueue) AddBatched(placementKey PlacementKey) { + sq.active.Add(placementKey) +} + +// Forget untracks a PlacementKey from rate limiter(s) (if any) set up with the queue. +func (sq *simplePlacementSchedulingQueue) Forget(placementKey PlacementKey) { + sq.active.Forget(placementKey) +} + +// NewSimplePlacementSchedulingQueue returns a simplePlacementSchedulingQueue. +func NewSimplePlacementSchedulingQueue(name string, rateLimiter workqueue.TypedRateLimiter[any]) PlacementSchedulingQueue { + if len(name) == 0 { + name = defaultSimplePlacementSchedulingQueueOptions.name + } + if rateLimiter == nil { + rateLimiter = defaultSimplePlacementSchedulingQueueOptions.rateLimiter + } + + return &simplePlacementSchedulingQueue{ + active: workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{ + Name: name, + }), + } +} diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/simple_test.go similarity index 86% rename from pkg/scheduler/queue/queue_test.go rename to pkg/scheduler/queue/simple_test.go index 4a100c9ff..e7fe1993a 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/simple_test.go @@ -22,10 +22,10 @@ import ( "github.com/google/go-cmp/cmp" ) -// TestSimplePlacementSchedulingQueueBasicOps tests the basic ops +// TestSimplePlacementSchedulingQueue_BasicOps tests the basic ops // (Add, Next, Done) of a simpleClusterResourcePlacementSchedulingQueue. -func TestSimplePlacementSchedulingQueueBasicOps(t *testing.T) { - sq := NewSimplePlacementSchedulingQueue() +func TestSimplePlacementSchedulingQueue_BasicOps(t *testing.T) { + sq := NewSimplePlacementSchedulingQueue("", nil) sq.Run() keysToAdd := []PlacementKey{"A", "B", "C", "D", "E"} diff --git a/pkg/scheduler/watchers/binding/suite_test.go b/pkg/scheduler/watchers/binding/suite_test.go index d2400ac61..91f6d31a6 100644 --- a/pkg/scheduler/watchers/binding/suite_test.go +++ b/pkg/scheduler/watchers/binding/suite_test.go @@ -93,7 +93,7 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred(), "Failed to create controller manager") - schedulerWorkQueue := queue.NewSimplePlacementSchedulingQueue() + schedulerWorkQueue := queue.NewSimplePlacementSchedulingQueue("", nil) // Create ClusterResourceBinding watcher crbReconciler. crbReconciler := &Reconciler{ diff --git a/pkg/scheduler/watchers/membercluster/suite_test.go b/pkg/scheduler/watchers/membercluster/suite_test.go index 5eacab0f0..c807b1ee9 100644 --- a/pkg/scheduler/watchers/membercluster/suite_test.go +++ b/pkg/scheduler/watchers/membercluster/suite_test.go @@ -184,7 +184,7 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred(), "Failed to create controller manager") - schedulerWorkQueue := queue.NewSimplePlacementSchedulingQueue() + schedulerWorkQueue := queue.NewSimplePlacementSchedulingQueue("", nil) reconciler := Reconciler{ Client: hubClient, diff --git a/pkg/scheduler/watchers/membercluster/watcher.go b/pkg/scheduler/watchers/membercluster/watcher.go index c2426822f..c504b496e 100644 --- a/pkg/scheduler/watchers/membercluster/watcher.go +++ b/pkg/scheduler/watchers/membercluster/watcher.go @@ -171,7 +171,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu "Enqueueing placement for scheduler processing", "memberCluster", memberClusterRef, "placement", klog.KObj(placement)) - r.SchedulerWorkQueue.Add(controller.GetObjectKeyFromObj(placement)) + // TO-DO (chenyu1): at this moment, the scheduler still uses a simple queue implementation; as a result, + // the placement keys will be added to the queue immediately even with the AddBatched() call. Switch + // to a batched processing queue implementation later to take advantage of the batched processing feature. + r.SchedulerWorkQueue.AddBatched(controller.GetObjectKeyFromObj(placement)) } // The reconciliation loop completes. diff --git a/pkg/scheduler/watchers/placement/suite_test.go b/pkg/scheduler/watchers/placement/suite_test.go index cdd898377..cad0ee66e 100644 --- a/pkg/scheduler/watchers/placement/suite_test.go +++ b/pkg/scheduler/watchers/placement/suite_test.go @@ -93,7 +93,7 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred(), "Failed to create controller manager") - schedulerWorkQueue := queue.NewSimplePlacementSchedulingQueue() + schedulerWorkQueue := queue.NewSimplePlacementSchedulingQueue("", nil) crpReconciler := &Reconciler{ Client: hubClient, diff --git a/pkg/scheduler/watchers/schedulingpolicysnapshot/suite_test.go b/pkg/scheduler/watchers/schedulingpolicysnapshot/suite_test.go index 4ffbbbc2b..96f958056 100644 --- a/pkg/scheduler/watchers/schedulingpolicysnapshot/suite_test.go +++ b/pkg/scheduler/watchers/schedulingpolicysnapshot/suite_test.go @@ -93,7 +93,7 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred(), "Failed to create controller manager") - schedulerWorkQueue := queue.NewSimplePlacementSchedulingQueue() + schedulerWorkQueue := queue.NewSimplePlacementSchedulingQueue("", nil) reconciler := &Reconciler{ Client: hubClient, diff --git a/test/scheduler/suite_test.go b/test/scheduler/suite_test.go index 144998168..d4484a106 100644 --- a/test/scheduler/suite_test.go +++ b/test/scheduler/suite_test.go @@ -571,7 +571,7 @@ func beforeSuiteForProcess1() []byte { Expect(err).NotTo(HaveOccurred(), "Failed to create controller manager") // Spin up a scheduler work queue. - schedulerWorkQueue := queue.NewSimplePlacementSchedulingQueue() + schedulerWorkQueue := queue.NewSimplePlacementSchedulingQueue("", nil) // Build a custom cluster eligibility checker. clusterEligibilityChecker := clustereligibilitychecker.New(