Skip to content

gRPC timeout configs#365

Open
gene-bordegaray wants to merge 6 commits intomainfrom
gene.bordegaray/2026/03/gRPC_timeout_configs
Open

gRPC timeout configs#365
gene-bordegaray wants to merge 6 commits intomainfrom
gene.bordegaray/2026/03/gRPC_timeout_configs

Conversation

@gene-bordegaray
Copy link
Copy Markdown
Collaborator

@gene-bordegaray gene-bordegaray commented Mar 6, 2026

Added the option to set some tonic / gRPC configs regarding timeouts. I see this being useful for new adopters to easily configure DFD to their liking.

Added Configs

Docs can be found here:
tonic
gRPC

  • grpc_connect_timeout_ms
    • can I establish a channel to the worker?
  • grpc_request_timeout_ms
    • can the full do_get RPC finish in time?
  • wait_plan_timeout_ms
    • once I reached the worker, how long do I wait for task data?
  • grpc_tcp_keepalive_ms
    • how do we notice the underlying TCP connection failed?

@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2026/03/gRPC_timeout_configs branch from 4ef7b3c to f3f5769 Compare March 14, 2026 12:10
@gabotechs
Copy link
Copy Markdown
Collaborator

gabotechs commented Mar 17, 2026

Thanks for submitting this! unfortunately I don't think I'll have time to review this any time soon, however, the first thing that comes to mind is that a 700 LOC PR for just timeout configuration is suspiciously too much code, I'd expect this kind of addition to take at least 1 order of magnitude less code.

This tends to happen when you let LLMs run too freely with the testing approach and/or the documentation. If you think there's an opportunity for reducing the amount code, that would be more than welcome.

use uuid::Uuid;

#[tokio::test]
async fn do_get_honors_propagated_wait_timeout() {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems do_get was most tested indirectly before this. Seems fitting to add this here as it's easy to isolate the failure

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It had some tests before, but we deleted them on purpose, as do_get is an internal detail that is susceptible to change, making this test break.

I can see how this test will break in the future even if the functionality remains the same, maybe this is better represented as an integration test?

Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😅 ended up leaving a bunch of comments.

The TL;DR is that I think there's different reasons for each individual timeout config to not have it in this project, left individual comments for the different configs, let me know what you think.

Comment on lines +51 to +53
/// Maximum time to wait while establishing a gRPC connection to a worker.
/// This is intended to bound connection setup to unreachable workers.
pub grpc_connect_timeout_ms: usize, default = grpc_connect_timeout_ms_default()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 This one is not really per-request. The connection happens only once, but once a worker to worker connection is stablished, is never broken. This means that this parameter will be respected just once on the first query after a deployment, and ignored until a restart of the worker process.

Comment on lines +95 to +106
let default_cfg = DistributedConfig::default();
let grpc_connect_timeout_ms = distributed_cfg
.map(|cfg| cfg.grpc_connect_timeout_ms)
.unwrap_or(default_cfg.grpc_connect_timeout_ms);
let grpc_tcp_keepalive_ms = distributed_cfg
.map(|cfg| cfg.grpc_tcp_keepalive_ms)
.unwrap_or(default_cfg.grpc_tcp_keepalive_ms);
let key = DefaultChannelResolverKey {
runtime_addr: Arc::as_ptr(&task_ctx.runtime_env()) as usize,
grpc_connect_timeout_ms,
grpc_tcp_keepalive_ms,
};
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This starts to get a bit hacky, it was hacky before already, but I don't think we should be doubling down on this pattern.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a limitation that could be better solved if the method in ChannelResolver accepted the Arc<TaskContext> in which they are running on as arguments.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let headers = metadata.into_headers();
let mut cfg = SessionConfig::default();
set_distributed_option_extension_from_headers::<DistributedConfig>(&mut cfg, &headers)
.map_err(datafusion_error_to_tonic_status)?;
Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a shame we need to pass the full DistributedConfig just for the wait_plan_timeout_ms.

If you ask me, I think I would not let the users configure the wait_plan_timeout_ms, this is an internal detail of how Distributed DataFusion works, and ideally users should not care about this.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or if you strongly think it's necessary, I'd probably just let it be configurable at the Worker level, so that we don't need to thread the DistributedConfig across inter-worker calls just for this single field.

use uuid::Uuid;

#[tokio::test]
async fn do_get_honors_propagated_wait_timeout() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It had some tests before, but we deleted them on purpose, as do_get is an internal detail that is susceptible to change, making this test break.

I can see how this test will break in the future even if the functionality remains the same, maybe this is better represented as an integration test?

Comment on lines +174 to +175
let grpc_connect_timeout_ms = self.grpc_connect_timeout_ms;
let grpc_tcp_keepalive_ms = self.grpc_tcp_keepalive_ms;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two parameters are only relevant for if users decide to live with the DefaultChannelResolver, but I do would expect the majority of users to have their own ChannelResolver, as they will likely need to adapt this project to their own networking infrastructure.

This means that, for any user that does not rely on DefaultChannelResolver, these two grpc_connect_timeout_ms and grpc_tcp_keepalive_ms will pretty much be dead parameters that do nothing to their queries.

From the two scenarios that I imagine:

  • People do not want to customize their networking setup: they probably also do not care about tweaking gRPC configs, and they expect this project to ship good defaults
  • People that do want to customize their networking setup: these parameters are not useful for them, as they build their gRPC clients on their own

Comment on lines +54 to +56
/// Total timeout for an outbound `do_get` request, in milliseconds.
/// This is a full-stream deadline for the whole RPC, not an idle timeout.
pub grpc_request_timeout_ms: usize, default = grpc_request_timeout_ms_default()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I imagine that users will have an API for serving their DataFusion queries, and they would already have a timeout on their API, and reaching the timeout in their API will automatically probably a cancellation recursively to all the workers.

What comes to mind is that, probably the timeout should not even be imposed by Distributed DataFusion, and should just be whatever timeout users naturally have in their APIs.

/// Maximum time a worker waits for task data to become available before failing the request.
pub wait_plan_timeout_ms: usize, default = wait_plan_timeout_ms_default()
/// TCP keepalive period used for worker-to-worker connections.
pub grpc_tcp_keepalive_ms: usize, default = grpc_tcp_keepalive_ms_default()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as https://github.com/datafusion-contrib/datafusion-distributed/pull/365/changes#r2976715396, the TCP keep alive will be taken into account once, and ignored until a worker restart.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants