From a494438a4c374260d57f0ebda4e2a144d5ed6fee Mon Sep 17 00:00:00 2001 From: Darren Bolduc Date: Thu, 5 Feb 2026 01:14:12 -0500 Subject: [PATCH] impl(pubsub): add options for lease RPCs --- src/pubsub/src/subscriber/retry_policy.rs | 160 +++++++++++++++++++++- 1 file changed, 158 insertions(+), 2 deletions(-) diff --git a/src/pubsub/src/subscriber/retry_policy.rs b/src/pubsub/src/subscriber/retry_policy.rs index 8c30a48718..ad641c6158 100644 --- a/src/pubsub/src/subscriber/retry_policy.rs +++ b/src/pubsub/src/subscriber/retry_policy.rs @@ -13,10 +13,13 @@ // limitations under the License. use crate::Error; +use gax::backoff_policy::BackoffPolicy; use gax::error::rpc::Code; -use gax::retry_policy::RetryPolicy; +use gax::options::RequestOptions; +use gax::retry_policy::{RetryPolicy, RetryPolicyExt}; use gax::retry_result::RetryResult; use gax::retry_state::RetryState; +use std::time::Duration; /// The subscriber's retry policy, specifically for StreamingPull RPCs. /// @@ -55,8 +58,52 @@ impl RetryPolicy for StreamRetryPolicy { } } +#[derive(Debug)] +struct OnlyTransportErrors; + +impl RetryPolicy for OnlyTransportErrors { + fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult { + if error.is_transport() { + RetryResult::Continue(error) + } else { + RetryResult::Permanent(error) + } + } +} + +#[derive(Debug)] +struct NoBackoff; + +impl BackoffPolicy for NoBackoff { + fn on_failure(&self, _state: &RetryState) -> Duration { + Duration::ZERO + } +} + +#[allow(dead_code)] // TODO(#4471) - use in DefaultLeaser +/// The policies for lease management RPCs in at-least-once delivery. +/// +/// Specifically, these are the `Acknowledge` and `ModifyAckDeadline` RPCs. +/// +/// The GFE will send a GO_AWAY frame to a tonic channel that has been open for +/// an hour. Tonic will reset the connection, but this can race with accepting +/// one of our requests. +/// +/// These requests fail with a variety of transport errors. It is safe to retry +/// these requests immediately. +/// +/// Note that an RPC attempt that fails because a channel is closed may be +/// retried on another channel that is closed. That is why we retry up to N +/// times where N is the number of channels in the client. +pub(super) fn at_least_once_options(grpc_subchannel_count: usize) -> RequestOptions { + let mut o = RequestOptions::default(); + o.set_retry_policy(OnlyTransportErrors.with_attempt_limit(grpc_subchannel_count as u32 + 1)); + o.set_backoff_policy(NoBackoff); + o +} + #[cfg(test)] -mod tests { +pub(super) mod tests { use super::*; use gax::error::CredentialsError; use gax::error::rpc::Status; @@ -175,4 +222,113 @@ mod tests { assert_eq!(retry.remaining_time(&state), None); } + + #[test] + fn only_transport_errors() { + let retry = OnlyTransportErrors; + + assert!(matches!( + retry.on_error(&RetryState::default(), transport_err()), + RetryResult::Continue(_) + )); + + assert!(matches!( + retry.on_error(&RetryState::default(), non_transport_err()), + RetryResult::Permanent(_) + )); + } + + #[test] + fn no_backoff() { + let backoff = NoBackoff; + assert_eq!(backoff.on_failure(&RetryState::default()), Duration::ZERO); + } + + #[test] + fn at_least_once_options() { + let o = super::at_least_once_options(42); + verify_policies(o, 42); + } + + #[track_caller] + pub(in super::super) fn verify_policies(o: RequestOptions, grpc_subchannel_count: u32) { + let retry = o.retry_policy().clone().unwrap(); + let backoff = o.backoff_policy().clone().unwrap(); + + let mut state = RetryState::default(); + state.attempt_count = 1; + assert!( + matches!( + retry.on_error(&state, transport_err()), + RetryResult::Continue(_) + ), + "initial transport error should be retried" + ); + assert!( + matches!( + retry.on_error(&state, non_transport_err()), + RetryResult::Permanent(_) + ), + "non-transport error should not be retried" + ); + assert_eq!( + backoff.on_failure(&state), + Duration::ZERO, + "the backoff should always be 0" + ); + + state.attempt_count = grpc_subchannel_count; + assert!( + matches!( + retry.on_error(&state, transport_err()), + RetryResult::Continue(_) + ), + "we should retry transport errors up to once for each gRPC channel" + ); + assert!( + matches!( + retry.on_error(&state, non_transport_err()), + RetryResult::Permanent(_) + ), + "non-transport error should not be retried" + ); + assert_eq!( + backoff.on_failure(&state), + Duration::ZERO, + "the backoff should always be 0" + ); + + state.attempt_count = grpc_subchannel_count + 1; + assert!( + matches!( + retry.on_error(&state, transport_err()), + RetryResult::Exhausted(_) + ), + "the retry policy should be exhausted after trying once for each gRPC channel" + ); + assert!( + matches!( + retry.on_error(&state, non_transport_err()), + RetryResult::Permanent(_) + ), + "non-transport error should not be retried" + ); + assert_eq!( + backoff.on_failure(&state), + Duration::ZERO, + "the backoff should always be 0" + ); + } + + fn transport_err() -> Error { + Error::transport(HeaderMap::new(), "connection closed") + } + + fn non_transport_err() -> Error { + Error::service( + Status::default() + .set_code(Code::Unavailable) + .set_message("bad gateway, try again"), + ) + } }