-
Notifications
You must be signed in to change notification settings - Fork 20
feat: add implementation for scheduler work queues with batch processing support #373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: add implementation for scheduler work queues with batch processing support #373
Conversation
Signed-off-by: michaelawyu <[email protected]>
|
Note: the implementation is not yet enabled -> for now the scheduler still uses the OG simple queue implementation. |
pkg/scheduler/queue/batched.go
Outdated
| // Add the keys to the active queue in batch. Here the implementation does not move keys one by one | ||
| // right after they are popped as this pattern risks synchronized processing (i.e., a key is popped | ||
| // from the batched queue, immeidiately added to the active queue and gets processed, then added | ||
| // back to the batched queue again by one of the watchers before the key moving attempt is finished, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aren't you still moving them one by one here because in between 2 calls of bq.active.Add(key), some watcher goroutine can still run and add the key back? Are you arguing that because this tight loop is happening very fast so it is highly unlikely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Wei! It is totally OK for a watcher to add a key back when we are moving it. K8s queues are dirty-writing queues, which means that if we Get() a key from the queue first, then Add() the same key without marking the key as done, the queue will hold the key (mark it as dirty) instead of pushing it to the queue. The dirty key will only be added when the key has been marked as done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this part of the code, the comment is implying that:
a) if we do something like this:
for ... {
k := batched.Get()
active.Add(k)
}there exists a corner case where the consumer (the scheduler) can get really fast and mark the key as done before the moving process completes; if the key has been marked as dirty, it will be pushed to the batched queue again, and the same key will be moved again as the moving process is still on.
b) instead, we keep the keys in memory first, and mark them as done only after the moving process itself has concluded. This guarantees that each key is moved only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tweaked the comment/code to be more clear; there's also a sameness situation that has been addressed.
| // NextPlacementKey returns the next PlacementKey (either clusterResourcePlacementKey or resourcePlacementKey) | ||
| // in the work queue for the scheduler to process. | ||
| func (bq *batchedProcessingPlacementSchedulingQueue) NextPlacementKey() (key PlacementKey, closed bool) { | ||
| // This will block on a condition variable if the queue is empty. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means that the caller will wait up to timer interval to get the next key? Isn't that 300 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Wei! The scheduler queue consists of two separate queues:
- The
activequeue handles changes that need immediate processing - The
batchedqueue handles changes that can be batched and process periodically; we periodically move keys in thebatchedqueue to theactivequeue.
It is up to the caller (watchers, etc.) to decide which queue to use. At this moment only member cluster changes are sent to the batched queue (AddBatched()), other watchers use the active queue (Add()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NextPlacementKey() will only block when the active queue is empty.
| if !ok { | ||
| t.Fatalf("Failed to cast to batchedProcessingPlacementSchedulingQueue") | ||
| } | ||
| bqStruct.moveNow <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intended to be a public method that the scheduler will call? If so it should be on the public interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Wei! At this moment I couldn't think of a case where the caller (the scheduler) needs to move all the keys immediately so I haven't exposed the channel. It is currently only used to signal queue shutdowns (and for testing purposes).
| } | ||
|
|
||
| // Reset the timer for the next round. | ||
| timer.Reset(time.Duration(bq.movePeriodSeconds) * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we reset the timer for case _, closed := <-bq.moveNow: ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Wei! The moving process is set to wait for 300 seconds after each moving attempt; I thought about using a tick-based mechanism (instead of waiting for 300 seconds after each move, do a move every 300 seconds regardless) but it has its own concerns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(missing ticks, two consecutive moves if we are unlucky, etc.)
Signed-off-by: michaelawyu <[email protected]>
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
| } | ||
|
|
||
| // NewSimplePlacementSchedulingQueue returns a simplePlacementSchedulingQueue. | ||
| func NewSimplePlacementSchedulingQueue(name string, rateLimiter workqueue.TypedRateLimiter[any]) PlacementSchedulingQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change this function signature? AFAIK, this is not part of the new common interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Ryan! Before we were using function based initialization, e.g., WithName, WithRateLimiter, but now we have two queue implementations (in the same package) this pattern is a bit difficult to follow. I could move them to separate child packages if you prefer continuing using this pattern
| active workqueue.TypedRateLimitingInterface[any] | ||
| batched workqueue.TypedRateLimitingInterface[any] | ||
|
|
||
| moveNow chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this channel is only closed and never used to pass anything, I wonder why it's called moveNow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Ryan! For testing we did send items to the channel to verify the behavior. Would you prefer a name like done, quit, or stop?
Description of your changes
This PR adds an implementation for scheduler work queues that allows certain change events (e.g., member cluster CR refreshes) to be processed in batch periodically rather than immediately, to save some unnecessary processing cycles.
I have:
make reviewableto ensure this PR is ready for review.How has this code been tested
Special notes for your reviewer