Skip to content

Conversation

@michaelawyu
Copy link
Collaborator

Description of your changes

This PR adds an implementation for scheduler work queues that allows certain change events (e.g., member cluster CR refreshes) to be processed in batch periodically rather than immediately, to save some unnecessary processing cycles.

I have:

  • Run make reviewable to ensure this PR is ready for review.

How has this code been tested

  • Unit tests

Special notes for your reviewer

@michaelawyu
Copy link
Collaborator Author

Note: the implementation is not yet enabled -> for now the scheduler still uses the OG simple queue implementation.

// Add the keys to the active queue in batch. Here the implementation does not move keys one by one
// right after they are popped as this pattern risks synchronized processing (i.e., a key is popped
// from the batched queue, immeidiately added to the active queue and gets processed, then added
// back to the batched queue again by one of the watchers before the key moving attempt is finished,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't you still moving them one by one here because in between 2 calls of bq.active.Add(key), some watcher goroutine can still run and add the key back? Are you arguing that because this tight loop is happening very fast so it is highly unlikely?

Copy link
Collaborator Author

@michaelawyu michaelawyu Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Wei! It is totally OK for a watcher to add a key back when we are moving it. K8s queues are dirty-writing queues, which means that if we Get() a key from the queue first, then Add() the same key without marking the key as done, the queue will hold the key (mark it as dirty) instead of pushing it to the queue. The dirty key will only be added when the key has been marked as done.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this part of the code, the comment is implying that:

a) if we do something like this:

for ... {
  k := batched.Get()
  active.Add(k)
}

there exists a corner case where the consumer (the scheduler) can get really fast and mark the key as done before the moving process completes; if the key has been marked as dirty, it will be pushed to the batched queue again, and the same key will be moved again as the moving process is still on.

b) instead, we keep the keys in memory first, and mark them as done only after the moving process itself has concluded. This guarantees that each key is moved only once.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tweaked the comment/code to be more clear; there's also a sameness situation that has been addressed.

// NextPlacementKey returns the next PlacementKey (either clusterResourcePlacementKey or resourcePlacementKey)
// in the work queue for the scheduler to process.
func (bq *batchedProcessingPlacementSchedulingQueue) NextPlacementKey() (key PlacementKey, closed bool) {
// This will block on a condition variable if the queue is empty.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means that the caller will wait up to timer interval to get the next key? Isn't that 300 seconds?

Copy link
Collaborator Author

@michaelawyu michaelawyu Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Wei! The scheduler queue consists of two separate queues:

  • The active queue handles changes that need immediate processing
  • The batched queue handles changes that can be batched and process periodically; we periodically move keys in the batched queue to the active queue.

It is up to the caller (watchers, etc.) to decide which queue to use. At this moment only member cluster changes are sent to the batched queue (AddBatched()), other watchers use the active queue (Add()).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NextPlacementKey() will only block when the active queue is empty.

if !ok {
t.Fatalf("Failed to cast to batchedProcessingPlacementSchedulingQueue")
}
bqStruct.moveNow <- struct{}{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended to be a public method that the scheduler will call? If so it should be on the public interface?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Wei! At this moment I couldn't think of a case where the caller (the scheduler) needs to move all the keys immediately so I haven't exposed the channel. It is currently only used to signal queue shutdowns (and for testing purposes).

}

// Reset the timer for the next round.
timer.Reset(time.Duration(bq.movePeriodSeconds) * time.Second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we reset the timer for case _, closed := <-bq.moveNow: ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Wei! The moving process is set to wait for 300 seconds after each moving attempt; I thought about using a tick-based mechanism (instead of waiting for 300 seconds after each move, do a move every 300 seconds regardless) but it has its own concerns.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(missing ticks, two consecutive moves if we are unlucky, etc.)

Signed-off-by: michaelawyu <[email protected]>
@codecov
Copy link

codecov bot commented Dec 10, 2025

Codecov Report

❌ Patch coverage is 79.20792% with 21 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
pkg/scheduler/queue/batched.go 78.26% 10 Missing and 5 partials ⚠️
pkg/scheduler/queue/simple.go 80.64% 5 Missing and 1 partial ⚠️

📢 Thoughts on this report? Let us know!

}

// NewSimplePlacementSchedulingQueue returns a simplePlacementSchedulingQueue.
func NewSimplePlacementSchedulingQueue(name string, rateLimiter workqueue.TypedRateLimiter[any]) PlacementSchedulingQueue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this function signature? AFAIK, this is not part of the new common interface

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ryan! Before we were using function based initialization, e.g., WithName, WithRateLimiter, but now we have two queue implementations (in the same package) this pattern is a bit difficult to follow. I could move them to separate child packages if you prefer continuing using this pattern

active workqueue.TypedRateLimitingInterface[any]
batched workqueue.TypedRateLimitingInterface[any]

moveNow chan struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this channel is only closed and never used to pass anything, I wonder why it's called moveNow?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ryan! For testing we did send items to the channel to verify the behavior. Would you prefer a name like done, quit, or stop?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants