Skip to content

Commit 2d78552

Browse files
committed
fix: use adjustable retry delays when sending data
1 parent db0159b commit 2d78552

File tree

6 files changed

+69
-78
lines changed

6 files changed

+69
-78
lines changed

common/src/cli/client.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@ pub struct ClientArguments {
1818

1919
impl From<ClientArguments> for FetcherOptions {
2020
fn from(value: ClientArguments) -> Self {
21-
FetcherOptions {
22-
timeout: value.timeout.into(),
23-
retries: value.retries,
24-
default_retry_after: value.default_retry_after.into(),
25-
}
21+
FetcherOptions::new()
22+
.timeout(value.timeout)
23+
.retries(value.retries)
24+
.retry_after(value.default_retry_after.into())
2625
}
2726
}
2827

common/src/fetcher/mod.rs

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ mod data;
44
use backon::{ExponentialBuilder, Retryable};
55
pub use data::*;
66

7-
use crate::http::get_retry_after_from_response_header;
7+
use crate::http::calculate_retry_after_from_response_header;
88
use reqwest::{Client, ClientBuilder, IntoUrl, Method, Response};
99
use std::fmt::Debug;
1010
use std::future::Future;
@@ -37,9 +37,10 @@ pub enum Error {
3737
#[non_exhaustive]
3838
#[derive(Clone, Debug)]
3939
pub struct FetcherOptions {
40-
pub timeout: Duration,
41-
pub retries: usize,
42-
pub default_retry_after: Duration,
40+
timeout: Duration,
41+
retries: usize,
42+
default_retry_after: Duration,
43+
max_retry_after: Duration,
4344
}
4445

4546
impl FetcherOptions {
@@ -61,8 +62,22 @@ impl FetcherOptions {
6162
}
6263

6364
/// Set the default retry-after duration when a 429 response doesn't include a Retry-After header.
64-
pub fn default_retry_after(mut self, duration: impl Into<Duration>) -> Self {
65-
self.default_retry_after = duration.into();
65+
pub fn retry_after(mut self, duration: Duration) -> Self {
66+
if duration > self.max_retry_after {
67+
panic!("Default retry-after cannot be greater than max retry-after (300s)");
68+
}
69+
self.default_retry_after = duration;
70+
self
71+
}
72+
73+
/// Set the default retry-after duration when a 429 response doesn't include a Retry-After header
74+
/// and checks the duration against the maximum retry-after.
75+
pub fn retry_after_with_max(mut self, default: Duration, max: Duration) -> Self {
76+
if default > max {
77+
panic!("Default retry-after cannot be greater than max retry-after");
78+
}
79+
self.default_retry_after = default;
80+
self.max_retry_after = max;
6681
self
6782
}
6883
}
@@ -73,6 +88,7 @@ impl Default for FetcherOptions {
7388
timeout: Duration::from_secs(30),
7489
retries: 5,
7590
default_retry_after: Duration::from_secs(10),
91+
max_retry_after: Duration::from_mins(5),
7692
}
7793
}
7894
}
@@ -124,33 +140,23 @@ impl Fetcher {
124140
let url = url.into_url()?;
125141

126142
let retries = self.retries;
127-
let backoff = ExponentialBuilder::default();
128-
129-
(|| async {
130-
match self.fetch_once(url.clone(), &processor).await {
131-
Ok(result) => Ok(result),
132-
Err(err) => {
133-
log::info!("Failed to retrieve: {err}");
134-
Err(err)
135-
}
136-
}
137-
})
138-
.retry(&backoff.with_max_times(retries))
139-
.notify(|err, dur| {
140-
// If rate limited, ensure we wait at least the Retry-After duration
141-
if let Error::RateLimited(retry_after) = err {
142-
if dur < *retry_after {
143-
log::info!(
144-
"Rate limited, extending wait from {:?} to {:?}",
145-
dur,
146-
retry_after
147-
);
148-
let additional = *retry_after - dur;
149-
std::thread::sleep(additional);
143+
let retry = ExponentialBuilder::default().with_max_times(retries);
144+
145+
(|| async { self.fetch_once(url.clone(), &processor).await })
146+
.retry(retry)
147+
.adjust(|e, dur| {
148+
if let Error::RateLimited(retry_after) = e {
149+
if let Some(dur_value) = dur
150+
&& dur_value > *retry_after
151+
{
152+
return dur;
153+
}
154+
Some(*retry_after) // only use server-provided delay if it's longer
155+
} else {
156+
dur // minimum delay as per backoff strategy
150157
}
151-
}
152-
})
153-
.await
158+
})
159+
.await
154160
}
155161

156162
async fn fetch_once<D: DataProcessor>(
@@ -164,7 +170,7 @@ impl Fetcher {
164170

165171
// Check for rate limiting
166172
if let Some(retry_after) =
167-
get_retry_after_from_response_header(&response, self.default_retry_after)
173+
calculate_retry_after_from_response_header(&response, self.default_retry_after)
168174
{
169175
log::info!("Rate limited (429), retry after: {:?}", retry_after);
170176
return Err(Error::RateLimited(retry_after));

common/src/http.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn parse_retry_after(value: &str) -> Option<RetryAfter> {
2424
None
2525
}
2626

27-
pub fn get_retry_after_from_response_header(
27+
pub fn calculate_retry_after_from_response_header(
2828
response: &Response,
2929
default_duration: Duration,
3030
) -> Option<Duration> {

common/tests/fetcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ async fn test_configurable_default_retry_after(#[case] custom_default_secs: u64)
229229
let fetcher = Fetcher::new(
230230
FetcherOptions::new()
231231
.retries(3)
232-
.default_retry_after(Duration::from_secs(custom_default_secs)),
232+
.retry_after(Duration::from_secs(custom_default_secs)),
233233
)
234234
.await
235235
.unwrap();

extras/src/visitors/send/clap.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,9 @@ impl SendArguments {
107107
)
108108
.await?;
109109

110-
Ok(SendVisitor {
111-
url: target,
112-
sender,
113-
retries,
114-
min_delay: Some(min_delay.into()),
115-
max_delay: Some(max_delay.into()),
116-
default_retry_after: None,
117-
})
110+
Ok(SendVisitor::new(target, sender)
111+
.retries(retries)
112+
.min_delay(min_delay)
113+
.max_delay(max_delay))
118114
}
119115
}

extras/src/visitors/send/mod.rs

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use bytes::Bytes;
33
use reqwest::{Body, Method, StatusCode, Url, header};
44
use std::time::Duration;
55
use walker_common::{
6-
http::get_retry_after_from_response_header,
6+
http::calculate_retry_after_from_response_header,
77
sender::{self, HttpSender},
88
};
99

@@ -49,16 +49,16 @@ pub struct SendVisitor {
4949
pub sender: HttpSender,
5050

5151
/// The number of retries in case of a server or transmission failure
52-
pub retries: usize,
52+
retries: usize,
5353

54-
/// The minimum delay between retries
55-
pub min_delay: Option<Duration>,
54+
/// The minimum delay between retries, will be overruled by the retry-after header if present.
55+
min_delay: Option<Duration>,
5656

57-
/// The maximum delay between retries
58-
pub max_delay: Option<Duration>,
57+
/// The maximum delay between retries, will be overruled by the retry-after header if present.
58+
max_delay: Option<Duration>,
5959

6060
/// The default retry-after duration when a 429 response doesn't include a Retry-After header
61-
pub default_retry_after: Option<Duration>,
61+
default_retry_after: Duration,
6262
}
6363

6464
impl SendVisitor {
@@ -69,7 +69,7 @@ impl SendVisitor {
6969
retries: 0,
7070
min_delay: None,
7171
max_delay: None,
72-
default_retry_after: Some(Duration::from_secs(10)),
72+
default_retry_after: Duration::from_secs(10),
7373
}
7474
}
7575

@@ -87,12 +87,6 @@ impl SendVisitor {
8787
self.max_delay = Some(retry_delay.into());
8888
self
8989
}
90-
91-
/// Set the default retry-after duration when a 429 response doesn't include a Retry-After header.
92-
pub fn default_retry_after(mut self, duration: impl Into<Duration>) -> Self {
93-
self.default_retry_after = Some(duration.into());
94-
self
95-
}
9690
}
9791

9892
#[derive(Debug, thiserror::Error)]
@@ -135,10 +129,9 @@ impl SendVisitor {
135129
.await
136130
.map_err(|err| SendOnceError::Temporary(err.into()))?;
137131

138-
if let Some(retry_after) = get_retry_after_from_response_header(
139-
&response,
140-
self.default_retry_after.unwrap_or(Duration::from_secs(10)),
141-
) {
132+
if let Some(retry_after) =
133+
calculate_retry_after_from_response_header(&response, self.default_retry_after)
134+
{
142135
log::warn!(
143136
"Rate limited (429) when uploading {name}, retry after: {:?}",
144137
retry_after
@@ -185,20 +178,17 @@ impl SendVisitor {
185178
Ok(
186179
(|| async { self.send_once(name, data.clone(), &customizer).await })
187180
.retry(retry)
188-
.sleep(tokio::time::sleep)
189181
.when(|e| matches!(e, SendOnceError::Temporary(_)))
190-
.notify(|err, dur| {
191-
// If rate limited, ensure we wait at least the Retry-After duration
192-
if let SendOnceError::Temporary(SendError::RateLimited(retry_after)) = err {
193-
if dur < *retry_after {
194-
log::info!(
195-
"Rate limited, extending wait from {:?} to {:?}",
196-
dur,
197-
retry_after
198-
);
199-
let additional = *retry_after - dur;
200-
std::thread::sleep(additional);
182+
.adjust(|e, dur| {
183+
if let SendOnceError::Temporary(SendError::RateLimited(retry_after)) = e {
184+
if let Some(dur_value) = dur
185+
&& dur_value > *retry_after
186+
{
187+
return dur;
201188
}
189+
Some(*retry_after) // only use server-provided delay if it's longer
190+
} else {
191+
dur // minimum delay as per backoff strategy
202192
}
203193
})
204194
.await?,

0 commit comments

Comments
 (0)