Skip to content

WorkerResolver API Redesign: Built-in worker version filtering #393

@EdsonPetry

Description

@EdsonPetry

We want to open up discussions on redesigning the WorkerResolver API to allow datafusion distributed to support built in worker version filtering. With worker versioning landing in #389, users can set and query any worker's version, but the actual filtering logic is entirely up to the user.

As of now, the documentation around this suggests that the user set up a background polling loop (or implement this in their worker discovery loop) along with caching workers (to prevent redundant polls) and filtering inside their WorkerResolver implementation so that WorkerResolver::get_urls() only returns workers with compatible versions (for discussion sake we can assume that compatible version here means same versions).

#383 introduces async distributed planning, moving plan distribution from the optimizer rule to execution time inside DistributedExec, potentially unlocking the ability for the library itself to perform async operations, like calling GetWorkerInfo, during worker resolution.

This issue proposes moving version filtering into datafusion-distributed itself, so users only need to provide a stream of candidate worker URLs and a desired version and the library handles the rest.

@gabotechs and I have met to discuss this and have some ideas to how this could work but would appreciate any feedback or different approaches.

Proposed Approaches

  1. Make WorkerResolver a struct, the user provides a watch::Receiver<Vec<Url>> with their latest known worker URL. May look something like the following:
// User code
let (tx, rx) = tokio::sync::watch::channel(vec![]);

// User service discovery loop, responsible for producing candidate URLs
tokio::spawn(async move {
    loop {
        let urls: Vec<Urls> = discover_workers().await?;
        let _ = tx.send(urls);
        tokio::time::sleep(Duration::from_secs(5)).await;
});

let session_state = SessionStateBuilder::new()
        .with_default_features()
        .with_distributed_worker_resolver(
                WorkerResolver::new(rx).with_version("1.0.0")
         )
         .build();


// Library code
pub struct WorkerResolver {
    /// Cache of worker URLs -> version, avoids re-querying known workers.
    cache: moka::sync::Cache<Url, String>,
    /// User-provided stream of candidate worker URLs.
    rx: watch::Reciever<Vec<Url>>,
    /// Only return workers matching this version if set
    expected_version: Option<String>,
}

impl WorkerResolver {
    pub fn new(rx: watch::Receiver<Vec<Url>>) -> Self { /* ... */ }
    pub fn with_expected_version(mut self, version: impl Into<String>) -> Self { /* ... */ }
    /// Called internally by library during async distributed planning.
    pub(crate) async fn resolve_urls(&self) -> Result<Vec<Url>, DataFusionError> { /* ... */ }
}
  1. WorkerResolver is extended with version-aware methods, the library uses it's own implementation to handle caching and filtering internally (similar to WorkerResolverExtension).
pub trait WorkerResolver {
    /// get_urls() now moves to providing the latest set of worker URLs.
    async fn get_urls(&self) -> Result<Vec<Url>, DataFusionError>;
    fn expected_version(&self) -> Option<&str> { None }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions