Skip to content

Commit 0cf3d97

Browse files
authored
Merge pull request #184 from sr-gi/retrier-waits
Defines Retrier polling waiting time as a constant and fixes tests
2 parents 4ce986e + 73a9bff commit 0cf3d97

File tree

1 file changed

+65
-46
lines changed

1 file changed

+65
-46
lines changed

watchtower-plugin/src/retrier.rs

Lines changed: 65 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use crate::net::http::{self, AddAppointmentError};
1616
use crate::wt_client::{RevocationData, WTClient};
1717
use crate::{MisbehaviorProof, TowerStatus};
1818

19+
const POLLING_TIME: u64 = 1;
20+
1921
#[derive(Eq, PartialEq, Debug)]
2022
enum RetryError {
2123
// bool marks whether the Subscription error is permanent or not
@@ -167,7 +169,7 @@ impl RetryManager {
167169
}
168170
}
169171
// Sleep to not waste a lot of CPU cycles.
170-
tokio::time::sleep(Duration::from_secs(1)).await;
172+
tokio::time::sleep(Duration::from_secs(POLLING_TIME)).await;
171173
}
172174
Err(TryRecvError::Disconnected) => break,
173175
}
@@ -599,8 +601,10 @@ mod tests {
599601
const LONG_AUTO_RETRY_DELAY: u32 = 60;
600602
const SHORT_AUTO_RETRY_DELAY: u32 = 3;
601603
const API_DELAY: f64 = 0.5;
604+
const HALF_API_DELAY: f64 = API_DELAY / 2.0;
602605
const MAX_ELAPSED_TIME: u16 = 2;
603606
const MAX_INTERVAL_TIME: u16 = 1;
607+
const MAX_RUN_TIME: f64 = 0.2;
604608

605609
impl Retrier {
606610
fn empty(wt_client: Arc<Mutex<WTClient>>, tower_id: TowerId) -> Self {
@@ -658,6 +662,9 @@ mod tests {
658662
});
659663

660664
// Start the task and send the tower to the channel for retry
665+
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
666+
.unwrap();
667+
661668
let wt_client_clone = wt_client.clone();
662669
let task = tokio::spawn(async move {
663670
RetryManager::new(
@@ -670,18 +677,18 @@ mod tests {
670677
.manage_retry()
671678
.await
672679
});
673-
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
674-
.unwrap();
675680

676-
// Wait for the elapsed time and check how the tower status changed
677-
tokio::time::sleep(Duration::from_secs((API_DELAY / 2.0) as u64)).await;
681+
// Wait for a fraction of the API delay and check how the tower status changed
682+
tokio::time::sleep(Duration::from_secs_f64(HALF_API_DELAY)).await;
678683
assert!(wt_client
679684
.lock()
680685
.unwrap()
681686
.get_retrier_status(&tower_id)
682687
.unwrap()
683688
.is_running());
684-
tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;
689+
690+
// Wait for the remaining time and re-check
691+
tokio::time::sleep(Duration::from_secs_f64(MAX_RUN_TIME + HALF_API_DELAY)).await;
685692

686693
let state = wt_client.lock().unwrap();
687694
assert_eq!(
@@ -727,24 +734,24 @@ mod tests {
727734
.add_pending_appointment(tower_id, &appointment);
728735

729736
// Start the task and send the tower to the channel for retry
730-
let wt_client_clone = wt_client.clone();
731-
732-
let mut retry_manager = RetryManager::new(
733-
wt_client_clone,
734-
rx,
735-
MAX_ELAPSED_TIME + 1,
736-
SHORT_AUTO_RETRY_DELAY,
737-
MAX_INTERVAL_TIME,
738-
);
739-
let task = tokio::spawn(async move { retry_manager.manage_retry().await });
740737
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
741738
.unwrap();
742739

743-
// Wait for the elapsed time and check how the tower status changed
744-
tokio::time::sleep(Duration::from_secs_f64(
745-
(MAX_ELAPSED_TIME as f64 + 1.0) / 2.0,
746-
))
747-
.await;
740+
let wt_client_clone = wt_client.clone();
741+
let task = tokio::spawn(async move {
742+
RetryManager::new(
743+
wt_client_clone,
744+
rx,
745+
MAX_ELAPSED_TIME,
746+
SHORT_AUTO_RETRY_DELAY,
747+
MAX_INTERVAL_TIME,
748+
)
749+
.manage_retry()
750+
.await
751+
});
752+
753+
// Wait for one retry round and check to tower status
754+
tokio::time::sleep(Duration::from_secs_f64(MAX_RUN_TIME)).await;
748755
assert!(wt_client
749756
.lock()
750757
.unwrap()
@@ -758,8 +765,9 @@ mod tests {
758765
.unwrap()
759766
.is_running());
760767

761-
// Wait until the task gives up and check again (this gives up due to accumulation of transient errors,
762-
// so the retiers will be idle).
768+
// Wait until the task gives up and check again (this gives up due to accumulation of transient errors, so the retiers will be idle).
769+
// Notice we'd normally wait for MAX_ELAPSED_TIME + MAX_RUN_TIME (the maximum time a Retrier can be working plus the marginal time of the last retry).
770+
// However, we've already waited for MAX_RUN_TIME right before to check the tower was temporary unreachable, so we don't need to account for that again.
763771
tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;
764772
assert!(wt_client
765773
.lock()
@@ -867,6 +875,9 @@ mod tests {
867875
});
868876

869877
// Start the task and send the tower to the channel for retry
878+
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
879+
.unwrap();
880+
870881
let wt_client_clone = wt_client.clone();
871882
let task = tokio::spawn(async move {
872883
RetryManager::new(
@@ -879,18 +890,18 @@ mod tests {
879890
.manage_retry()
880891
.await
881892
});
882-
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
883-
.unwrap();
884-
// Wait for the elapsed time and check how the tower status changed
885-
tokio::time::sleep(Duration::from_secs((API_DELAY / 2.0) as u64)).await;
893+
894+
// Wait for a fraction of the API delay and check how the tower status changed
895+
tokio::time::sleep(Duration::from_secs_f64(HALF_API_DELAY)).await;
886896
assert!(wt_client
887897
.lock()
888898
.unwrap()
889899
.get_retrier_status(&tower_id)
890900
.unwrap()
891901
.is_running());
892902

893-
tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;
903+
// Wait for the remaining time and re-check
904+
tokio::time::sleep(Duration::from_secs_f64(MAX_RUN_TIME + HALF_API_DELAY)).await;
894905
assert_eq!(
895906
wt_client
896907
.lock()
@@ -965,6 +976,9 @@ mod tests {
965976
});
966977

967978
// Start the task and send the tower to the channel for retry
979+
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
980+
.unwrap();
981+
968982
let wt_client_clone = wt_client.clone();
969983
let task = tokio::spawn(async move {
970984
RetryManager::new(
@@ -977,25 +991,27 @@ mod tests {
977991
.manage_retry()
978992
.await
979993
});
980-
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
981-
.unwrap();
982994

983-
// Wait for the elapsed time and check how the tower status changed
984-
tokio::time::sleep(Duration::from_secs_f64(API_DELAY / 2.0)).await;
995+
// Wait for a fraction of the API delay and check how the tower status changed
996+
tokio::time::sleep(Duration::from_secs_f64(HALF_API_DELAY)).await;
985997
assert!(wt_client
986998
.lock()
987999
.unwrap()
9881000
.get_retrier_status(&tower_id)
9891001
.unwrap()
9901002
.is_running());
9911003

992-
tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;
1004+
// Wait for the remaining time and re-check
1005+
tokio::time::sleep(Duration::from_secs_f64(HALF_API_DELAY + MAX_RUN_TIME)).await;
9931006
assert!(wt_client
9941007
.lock()
9951008
.unwrap()
9961009
.get_tower_status(&tower_id)
9971010
.unwrap()
9981011
.is_misbehaving());
1012+
1013+
// Retriers are wiped every polling interval, so we'll need to wait a bit more to check it
1014+
tokio::time::sleep(Duration::from_secs(POLLING_TIME)).await;
9991015
assert!(!wt_client.lock().unwrap().retriers.contains_key(&tower_id));
10001016
api_mock.assert();
10011017

@@ -1025,6 +1041,8 @@ mod tests {
10251041
wt_client.lock().unwrap().remove_tower(tower_id).unwrap();
10261042

10271043
// Start the task and send the tower to the channel for retry
1044+
tx.send((tower_id, RevocationData::None)).unwrap();
1045+
10281046
let wt_client_clone = wt_client.clone();
10291047
let task = tokio::spawn(async move {
10301048
RetryManager::new(
@@ -1038,9 +1056,6 @@ mod tests {
10381056
.await
10391057
});
10401058

1041-
// Send a retry request and check how the tower is removed
1042-
tx.send((tower_id, RevocationData::None)).unwrap();
1043-
tokio::time::sleep(Duration::from_secs(1)).await;
10441059
assert!(!wt_client.lock().unwrap().towers.contains_key(&tower_id));
10451060

10461061
task.abort();
@@ -1085,7 +1100,6 @@ mod tests {
10851100
let add_appointment_mock = server.mock(|when, then| {
10861101
when.method(POST).path(Endpoint::AddAppointment.path());
10871102
then.status(200)
1088-
.delay(Duration::from_secs_f64(API_DELAY))
10891103
.header("content-type", "application/json")
10901104
.json_body(json!(add_appointment_response));
10911105
});
@@ -1095,7 +1109,7 @@ mod tests {
10951109
get_registration_receipt_from_previous(&registration_receipt);
10961110
re_registration_receipt.sign(&tower_sk);
10971111
let register_mock = server.mock(|when, then| {
1098-
when.method(POST).path("/register");
1112+
when.method(POST).path(Endpoint::Register.path());
10991113
then.status(200)
11001114
.delay(Duration::from_secs_f64(API_DELAY))
11011115
.header("content-type", "application/json")
@@ -1109,6 +1123,9 @@ mod tests {
11091123
.set_tower_status(tower_id, TowerStatus::SubscriptionError);
11101124

11111125
// Start the task and send the tower to the channel for retry
1126+
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
1127+
.unwrap();
1128+
11121129
let wt_client_clone = wt_client.clone();
11131130
let task = tokio::spawn(async move {
11141131
RetryManager::new(
@@ -1121,19 +1138,18 @@ mod tests {
11211138
.manage_retry()
11221139
.await
11231140
});
1124-
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
1125-
.unwrap();
11261141

1127-
tokio::time::sleep(Duration::from_secs_f64(API_DELAY / 2.0)).await;
1142+
// Wait for a fraction of the API delay and check how the tower status changed
1143+
tokio::time::sleep(Duration::from_secs_f64(HALF_API_DELAY)).await;
11281144
assert!(wt_client
11291145
.lock()
11301146
.unwrap()
11311147
.get_retrier_status(&tower_id)
11321148
.unwrap()
11331149
.is_running());
11341150

1135-
// Wait for the elapsed time and check how the tower status changed
1136-
tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;
1151+
// Wait for the remaining time and re-check
1152+
tokio::time::sleep(Duration::from_secs_f64(MAX_RUN_TIME + HALF_API_DELAY)).await;
11371153
let state = wt_client.lock().unwrap();
11381154
assert!(!state.retriers.contains_key(&tower_id));
11391155

@@ -1192,7 +1208,10 @@ mod tests {
11921208

11931209
{
11941210
// After the retriers gives up, it should go idling and flag the tower as unreachable
1195-
tokio::time::sleep(Duration::from_secs((MAX_ELAPSED_TIME) as u64)).await;
1211+
tokio::time::sleep(Duration::from_secs_f64(
1212+
MAX_ELAPSED_TIME as f64 + MAX_RUN_TIME,
1213+
))
1214+
.await;
11961215
let state = wt_client.lock().unwrap();
11971216
assert!(state.get_retrier_status(&tower_id).unwrap().is_idle());
11981217

@@ -1212,7 +1231,7 @@ mod tests {
12121231
.unwrap();
12131232

12141233
{
1215-
tokio::time::sleep(Duration::from_secs(2)).await;
1234+
tokio::time::sleep(Duration::from_secs_f64(POLLING_TIME as f64 + MAX_RUN_TIME)).await;
12161235
let state = wt_client.lock().unwrap();
12171236
assert!(state.get_retrier_status(&tower_id).unwrap().is_idle());
12181237
let tower = state.towers.get(&tower_id).unwrap();
@@ -1261,7 +1280,7 @@ mod tests {
12611280
// Send a retry flag to the retrier to force a retry.
12621281
tx.send((tower_id, RevocationData::None)).unwrap();
12631282

1264-
tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;
1283+
tokio::time::sleep(Duration::from_secs_f64(POLLING_TIME as f64 + MAX_RUN_TIME)).await;
12651284
// FIXME: Here we should be able to check this, however, due to httpmock limitations, we cannot return a response based on the request.
12661285
// Therefore, both requests will be responded with the same data. Given pending_appointments is a HashSet, we cannot even know which request
12671286
// will be sent first (sets are initialized with a random state, which decided the order or iteration).

0 commit comments

Comments
 (0)