Conversation
4ef7b3c to
f3f5769
Compare
|
Thanks for submitting this! unfortunately I don't think I'll have time to review this any time soon, however, the first thing that comes to mind is that a 700 LOC PR for just timeout configuration is suspiciously too much code, I'd expect this kind of addition to take at least 1 order of magnitude less code. This tends to happen when you let LLMs run too freely with the testing approach and/or the documentation. If you think there's an opportunity for reducing the amount code, that would be more than welcome. |
| use uuid::Uuid; | ||
|
|
||
| #[tokio::test] | ||
| async fn do_get_honors_propagated_wait_timeout() { |
There was a problem hiding this comment.
seems do_get was most tested indirectly before this. Seems fitting to add this here as it's easy to isolate the failure
There was a problem hiding this comment.
It had some tests before, but we deleted them on purpose, as do_get is an internal detail that is susceptible to change, making this test break.
I can see how this test will break in the future even if the functionality remains the same, maybe this is better represented as an integration test?
gabotechs
left a comment
There was a problem hiding this comment.
😅 ended up leaving a bunch of comments.
The TL;DR is that I think there's different reasons for each individual timeout config to not have it in this project, left individual comments for the different configs, let me know what you think.
| /// Maximum time to wait while establishing a gRPC connection to a worker. | ||
| /// This is intended to bound connection setup to unreachable workers. | ||
| pub grpc_connect_timeout_ms: usize, default = grpc_connect_timeout_ms_default() |
There was a problem hiding this comment.
🤔 This one is not really per-request. The connection happens only once, but once a worker to worker connection is stablished, is never broken. This means that this parameter will be respected just once on the first query after a deployment, and ignored until a restart of the worker process.
| let default_cfg = DistributedConfig::default(); | ||
| let grpc_connect_timeout_ms = distributed_cfg | ||
| .map(|cfg| cfg.grpc_connect_timeout_ms) | ||
| .unwrap_or(default_cfg.grpc_connect_timeout_ms); | ||
| let grpc_tcp_keepalive_ms = distributed_cfg | ||
| .map(|cfg| cfg.grpc_tcp_keepalive_ms) | ||
| .unwrap_or(default_cfg.grpc_tcp_keepalive_ms); | ||
| let key = DefaultChannelResolverKey { | ||
| runtime_addr: Arc::as_ptr(&task_ctx.runtime_env()) as usize, | ||
| grpc_connect_timeout_ms, | ||
| grpc_tcp_keepalive_ms, | ||
| }; |
There was a problem hiding this comment.
This starts to get a bit hacky, it was hacky before already, but I don't think we should be doubling down on this pattern.
There was a problem hiding this comment.
I think this is a limitation that could be better solved if the method in ChannelResolver accepted the Arc<TaskContext> in which they are running on as arguments.
There was a problem hiding this comment.
Because of https://github.com/datafusion-contrib/datafusion-distributed/pull/365/changes#r2976789668, I'm not sure if it's worth the complexity...
| let headers = metadata.into_headers(); | ||
| let mut cfg = SessionConfig::default(); | ||
| set_distributed_option_extension_from_headers::<DistributedConfig>(&mut cfg, &headers) | ||
| .map_err(datafusion_error_to_tonic_status)?; |
There was a problem hiding this comment.
It's a shame we need to pass the full DistributedConfig just for the wait_plan_timeout_ms.
If you ask me, I think I would not let the users configure the wait_plan_timeout_ms, this is an internal detail of how Distributed DataFusion works, and ideally users should not care about this.
There was a problem hiding this comment.
Or if you strongly think it's necessary, I'd probably just let it be configurable at the Worker level, so that we don't need to thread the DistributedConfig across inter-worker calls just for this single field.
| use uuid::Uuid; | ||
|
|
||
| #[tokio::test] | ||
| async fn do_get_honors_propagated_wait_timeout() { |
There was a problem hiding this comment.
It had some tests before, but we deleted them on purpose, as do_get is an internal detail that is susceptible to change, making this test break.
I can see how this test will break in the future even if the functionality remains the same, maybe this is better represented as an integration test?
| let grpc_connect_timeout_ms = self.grpc_connect_timeout_ms; | ||
| let grpc_tcp_keepalive_ms = self.grpc_tcp_keepalive_ms; |
There was a problem hiding this comment.
These two parameters are only relevant for if users decide to live with the DefaultChannelResolver, but I do would expect the majority of users to have their own ChannelResolver, as they will likely need to adapt this project to their own networking infrastructure.
This means that, for any user that does not rely on DefaultChannelResolver, these two grpc_connect_timeout_ms and grpc_tcp_keepalive_ms will pretty much be dead parameters that do nothing to their queries.
From the two scenarios that I imagine:
- People do not want to customize their networking setup: they probably also do not care about tweaking gRPC configs, and they expect this project to ship good defaults
- People that do want to customize their networking setup: these parameters are not useful for them, as they build their gRPC clients on their own
| /// Total timeout for an outbound `do_get` request, in milliseconds. | ||
| /// This is a full-stream deadline for the whole RPC, not an idle timeout. | ||
| pub grpc_request_timeout_ms: usize, default = grpc_request_timeout_ms_default() |
There was a problem hiding this comment.
🤔 I imagine that users will have an API for serving their DataFusion queries, and they would already have a timeout on their API, and reaching the timeout in their API will automatically probably a cancellation recursively to all the workers.
What comes to mind is that, probably the timeout should not even be imposed by Distributed DataFusion, and should just be whatever timeout users naturally have in their APIs.
| /// Maximum time a worker waits for task data to become available before failing the request. | ||
| pub wait_plan_timeout_ms: usize, default = wait_plan_timeout_ms_default() | ||
| /// TCP keepalive period used for worker-to-worker connections. | ||
| pub grpc_tcp_keepalive_ms: usize, default = grpc_tcp_keepalive_ms_default() |
There was a problem hiding this comment.
Same as https://github.com/datafusion-contrib/datafusion-distributed/pull/365/changes#r2976715396, the TCP keep alive will be taken into account once, and ignored until a worker restart.
Added the option to set some tonic / gRPC configs regarding timeouts. I see this being useful for new adopters to easily configure DFD to their liking.
Added Configs
Docs can be found here:
tonic
gRPC
grpc_connect_timeout_msgrpc_request_timeout_mswait_plan_timeout_msgrpc_tcp_keepalive_ms