-
Notifications
You must be signed in to change notification settings - Fork 38
Add support for worker versioning via GetWorkerInfo RPC
#381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
EdsonPetry
wants to merge
6
commits into
datafusion-contrib:gabrielmusat/custom-worker-service
from
EdsonPetry:edson.petry/worker-info-rpc
Closed
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
8866763
feat(worker proto): add new GetWorkerInfo rpc on WorkerService
EdsonPetry ac205b7
feat: add get_worker_info handlers in worker service
EdsonPetry 8397f60
docs: add worker versioning guide and version-aware WorkerResolver ex…
EdsonPetry 53fc775
docs corrections
EdsonPetry 4b4aca1
docs: fix incorrect SessionStateBuilder import path in worker.md
EdsonPetry e5ece04
docs: move worker_version binding into usage example where it belongs
EdsonPetry File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,3 +95,146 @@ async fn main() { | |
| ``` | ||
|
|
||
| The `into_worker_server()` method builds a `WorkerServiceServer` ready to be added as a Tonic service. | ||
|
|
||
| ## Worker Versioning | ||
|
|
||
| Workers expose a `GetWorkerInfo` gRPC endpoint that reports metadata about the running worker, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Decided this wording was best since it's not unlikely that in the future |
||
| including a user-defined version string. This is useful during rolling deployments, when | ||
| workers running different code versions coexist in the cluster, the coordinator can route queries | ||
| only to workers running compatible code. | ||
|
|
||
| ### Setting a version | ||
|
|
||
| Use the `Worker::with_version()` builder method to tag a worker with a version string. | ||
|
|
||
| The version string is free-form — it can be a semver tag, a git SHA, a build number, or any | ||
| identifier that makes sense for your deployment workflow. Workers that don't call `with_version()` | ||
| report an empty string. | ||
|
|
||
| ```rust | ||
| let worker = Worker::default() | ||
| .with_version("2.0.0"); | ||
| ``` | ||
|
|
||
| One way to avoid forgetting to bump the version on each deploy, derive it from an environment variable | ||
| set by your CI/CD pipeline: | ||
|
|
||
| ```rust | ||
| let worker = Worker::default() | ||
| .with_version(std::env::var("COMMIT_HASH").unwrap_or_default()); | ||
| ``` | ||
|
|
||
| ### Querying a worker's version | ||
|
|
||
| From the coordinator, use `DefaultChannelResolver` to get a cached channel | ||
| and `create_worker_client` to build a client, then call `get_worker_info`: | ||
|
|
||
| ```rust | ||
| use datafusion_distributed::{DefaultChannelResolver, GetWorkerInfoRequest, create_worker_client}; | ||
|
|
||
| let channel_resolver = DefaultChannelResolver::default(); | ||
| let channel = channel_resolver.get_channel(&worker_url).await?; | ||
| let mut client = create_worker_client(channel); | ||
|
|
||
| let response = client.get_worker_info(GetWorkerInfoRequest {}).await?; | ||
| println!("version: {}", response.into_inner().version_number); | ||
| ``` | ||
|
|
||
| ### Zero-downtime rolling deployments | ||
|
|
||
| During a rolling deployment, workers transition from version A to version B over time. To avoid | ||
| routing queries to workers running incompatible code, you can filter workers by version before | ||
| the planner sees them. | ||
|
|
||
| The recommended pattern is: | ||
|
|
||
| 1. **Background polling loop**: Periodically query each worker's version and maintain a filtered | ||
| list of compatible URLs. | ||
| 2. **Version-aware WorkerResolver**: Implement `WorkerResolver::get_urls()` to return only the | ||
| compatible URLs from the filtered list. | ||
|
|
||
| ```rust | ||
| use std::sync::{Arc, RwLock}; | ||
| use std::time::Duration; | ||
| use url::Url; | ||
| use datafusion::common::DataFusionError; | ||
| use datafusion_distributed::{ | ||
| DefaultChannelResolver, GetWorkerInfoRequest, WorkerResolver, create_worker_client, | ||
| }; | ||
|
|
||
| struct VersionAwareWorkerResolver { | ||
| compatible_urls: Arc<RwLock<Vec<Url>>>, | ||
| } | ||
|
|
||
| impl VersionAwareWorkerResolver { | ||
| /// Starts a background task that periodically polls all known worker URLs | ||
| /// and filters them by the expected version. | ||
| fn start_version_filtering( | ||
| known_urls: Vec<Url>, | ||
| expected_version: String, | ||
| ) -> Self { | ||
| let compatible_urls = Arc::new(RwLock::new(vec![])); | ||
| let urls_handle = compatible_urls.clone(); | ||
|
|
||
| tokio::spawn(async move { | ||
| let channel_resolver = DefaultChannelResolver::default(); | ||
| loop { | ||
| let mut filtered = vec![]; | ||
| for url in &known_urls { | ||
| if let Ok(channel) = channel_resolver.get_channel(url).await { | ||
| let mut client = create_worker_client(channel); | ||
| if let Ok(resp) = client.get_worker_info(GetWorkerInfoRequest {}).await { | ||
| if resp.into_inner().version_number == expected_version { | ||
| filtered.push(url.clone()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| *urls_handle.write().unwrap() = filtered; | ||
| tokio::time::sleep(Duration::from_secs(5)).await; | ||
| } | ||
| }); | ||
|
|
||
| Self { compatible_urls } | ||
| } | ||
| } | ||
|
|
||
| impl WorkerResolver for VersionAwareWorkerResolver { | ||
| fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> { | ||
| Ok(self.compatible_urls.read().unwrap().clone()) | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| With the resolver in place, wire it into the session and tag each worker with a version. | ||
|
|
||
| ```rust | ||
| use datafusion::execution::SessionStateBuilder; | ||
| use datafusion_distributed::{DistributedExt, DistributedPhysicalOptimizerRule, Worker}; | ||
|
|
||
| let worker_version = std::env::var("COMMIT_HASH").unwrap_or_default(); | ||
|
|
||
| // `known_urls` comes from your service discovery. | ||
| let resolver = VersionAwareWorkerResolver::start_version_filtering( | ||
| known_urls, | ||
| worker_version.clone(), | ||
| ); | ||
|
|
||
| let state = SessionStateBuilder::new() | ||
| .with_default_features() | ||
| .with_distributed_worker_resolver(resolver) | ||
| .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) | ||
| .build(); | ||
|
|
||
| let ctx = SessionContext::from(state); | ||
|
|
||
| let worker = Worker::default().with_version(worker_version); | ||
|
|
||
| Server::builder() | ||
| .add_service(worker.into_worker_server()) | ||
| .serve(addr) | ||
| .await?; | ||
| ``` | ||
|
|
||
| The coordinator's resolver continuously polls all known URLs in the background. | ||
| Only workers that respond with the correct version will appear in `get_urls()`. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newline removed by my IDEs markdown formatter