Skip to content

Commit b48c0c6

Browse files
committed
feat: query multiple beacon nodes in parallel
1 parent c2aa360 commit b48c0c6

File tree

3 files changed

+290
-3
lines changed

3 files changed

+290
-3
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

anchor/validator_store/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ beacon_node_fallback = { workspace = true }
99
database = { workspace = true }
1010
eth2 = { workspace = true }
1111
ethereum_ssz = { workspace = true }
12+
futures = { workspace = true }
1213
hex = { workspace = true }
1314
lru = { workspace = true }
1415
metrics = { workspace = true }

anchor/validator_store/src/metadata_service.rs

Lines changed: 288 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,26 @@
1-
use std::{collections::HashMap, sync::Arc, time::Duration};
1+
use std::{
2+
collections::HashMap,
3+
sync::Arc,
4+
time::{Duration, Instant},
5+
};
26

37
use beacon_node_fallback::BeaconNodeFallback;
8+
use eth2::{BeaconNodeHttpClient, types::BlockId};
9+
use futures::{StreamExt, stream::FuturesUnordered};
410
use slot_clock::SlotClock;
511
use ssv_types::{ValidatorIndex, consensus::BeaconVote};
612
use task_executor::TaskExecutor;
713
use tokio::time::sleep;
8-
use tracing::{error, info, trace};
9-
use types::{ChainSpec, EthSpec};
14+
use tracing::{debug, error, info, trace, warn};
15+
use types::{AttestationData, ChainSpec, EthSpec, Hash256, Slot};
1016
use validator_services::duties_service::DutiesService;
1117

1218
use crate::{AnchorValidatorStore, ContributionWaiter, SlotMetadata};
1319

20+
const SOFT_TIMEOUT: Duration = Duration::from_millis(500);
21+
const HARD_TIMEOUT: Duration = Duration::from_secs(1);
22+
const BLOCK_SLOT_LOOKUP_TIMEOUT: Duration = Duration::from_millis(250);
23+
1424
pub struct MetadataService<E: EthSpec, T: SlotClock + 'static> {
1525
duties_service: Arc<DutiesService<AnchorValidatorStore<T, E>, T>>,
1626
validator_store: Arc<AnchorValidatorStore<T, E>>,
@@ -78,6 +88,12 @@ impl<E: EthSpec, T: SlotClock + 'static> MetadataService<E, T> {
7888
async fn update_metadata(&self) -> Result<(), String> {
7989
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
8090

91+
let weighted = false;
92+
93+
if weighted {
94+
self.weighted_calculation(slot).await?;
95+
}
96+
8197
let attestation_data = self
8298
.beacon_nodes
8399
.first_success(|beacon_node| async move {
@@ -157,4 +173,273 @@ impl<E: EthSpec, T: SlotClock + 'static> MetadataService<E, T> {
157173

158174
Ok(())
159175
}
176+
177+
async fn weighted_calculation(&self, slot: Slot) -> Result<AttestationData, String> {
178+
let started = Instant::now();
179+
180+
let clients: Vec<(String, BeaconNodeHttpClient)> = {
181+
let candidates = self.beacon_nodes.candidates.read().await;
182+
candidates
183+
.iter()
184+
.map(|c| (c.beacon_node.to_string(), c.beacon_node.clone()))
185+
.collect()
186+
};
187+
188+
let num_clients = clients.len();
189+
190+
debug!(num_clients, "Starting weighted attestation data fetch");
191+
192+
// Spawn all fetch requests in parallel
193+
let mut futures: FuturesUnordered<_> = clients
194+
.into_iter()
195+
.map(|(addr, client)| async move {
196+
let result = self.fetch_and_score(&client, slot).await;
197+
(addr, result)
198+
})
199+
.collect();
200+
201+
let mut succeeded = 0;
202+
let mut failed = 0;
203+
let mut best_data: Option<ScoredAttestationData> = None;
204+
let mut soft_timeout_hit = false;
205+
206+
// We have two timeouts: a soft timeout and a hard timeout.
207+
// At the soft timeout, we return if we have any responses so far.
208+
// At the hard timeout, we return unconditionally.
209+
// The soft timeout is half the duration of the hard timeout.
210+
211+
// Collect responses until soft timeout (500ms)
212+
let soft_timeout = sleep(SOFT_TIMEOUT);
213+
tokio::pin!(soft_timeout);
214+
215+
loop {
216+
tokio::select! {
217+
biased;
218+
219+
Some((addr, result)) = futures.next() => {
220+
match result {
221+
Ok(scored_attestation) => {
222+
succeeded += 1;
223+
trace!(
224+
elapsed_ms = started.elapsed().as_millis(),
225+
client = %scored_attestation.client_addr,
226+
score = scored_attestation.score,
227+
succeeded,
228+
failed,
229+
"Attestation data received"
230+
);
231+
232+
// Update best if this score is higher
233+
best_data = Some(match best_data {
234+
Some(current) if current.score >= scored_attestation.score => current,
235+
_ => {
236+
debug!(
237+
client = %scored_attestation.client_addr,
238+
score = scored_attestation.score,
239+
"New best attestation data"
240+
);
241+
scored_attestation
242+
}
243+
});
244+
}
245+
Err(e) => {
246+
failed += 1;
247+
warn!(
248+
elapsed_ms = started.elapsed().as_millis(),
249+
client = %addr,
250+
error = %e,
251+
succeeded,
252+
failed,
253+
"Failed to fetch attestation data"
254+
);
255+
}
256+
}
257+
258+
// All responses received, exit early
259+
if succeeded + failed == num_clients {
260+
break;
261+
}
262+
}
263+
264+
() = &mut soft_timeout, if !soft_timeout_hit => {
265+
soft_timeout_hit = true;
266+
debug!(
267+
elapsed_ms = started.elapsed().as_millis(),
268+
succeeded,
269+
failed,
270+
pending = num_clients - succeeded - failed,
271+
"Soft timeout reached"
272+
);
273+
274+
// If we have at least one response, return early
275+
if best_data.is_some() {
276+
break;
277+
}
278+
}
279+
280+
else => break,
281+
}
282+
}
283+
284+
// If no responses yet, wait until hard timeout (1s)
285+
if best_data.is_none() && succeeded + failed < num_clients {
286+
let remaining = HARD_TIMEOUT.saturating_sub(started.elapsed());
287+
let hard_timeout = sleep(remaining);
288+
tokio::pin!(hard_timeout);
289+
290+
loop {
291+
tokio::select! {
292+
biased;
293+
294+
Some((addr, result)) = futures.next() => {
295+
match result {
296+
Ok(scored_attestation) => {
297+
succeeded += 1;
298+
trace!(
299+
elapsed_ms = started.elapsed().as_millis(),
300+
client = %scored_attestation.client_addr,
301+
score = scored_attestation.score,
302+
"Response received (hard timeout phase)"
303+
);
304+
305+
best_data = Some(match best_data {
306+
Some(current) if current.score >= scored_attestation.score => current,
307+
_ => scored_attestation,
308+
});
309+
}
310+
Err(e) => {
311+
failed += 1;
312+
warn!(
313+
client = %addr,
314+
error = %e,
315+
"Error in hard timeout phase"
316+
);
317+
}
318+
}
319+
320+
if succeeded + failed == num_clients {
321+
break;
322+
}
323+
}
324+
325+
() = &mut hard_timeout => {
326+
error!(
327+
elapsed_ms = started.elapsed().as_millis(),
328+
succeeded,
329+
failed,
330+
timed_out = num_clients - succeeded - failed,
331+
"Hard timeout reached"
332+
);
333+
break;
334+
}
335+
336+
else => break,
337+
}
338+
}
339+
}
340+
341+
// Return best result or error if none received
342+
match best_data {
343+
Some(scored_attestation) => {
344+
debug!(
345+
elapsed_ms = started.elapsed().as_millis(),
346+
client = %scored_attestation.client_addr,
347+
score = scored_attestation.score,
348+
succeeded,
349+
failed,
350+
"Selected best attestation data"
351+
);
352+
Ok(scored_attestation.attestation_data)
353+
}
354+
None => Err(format!(
355+
"No attestation data received from any of {} beacon nodes (succeeded: {}, failed: {})",
356+
num_clients, succeeded, failed
357+
)),
358+
}
359+
}
360+
361+
async fn fetch_and_score(
362+
&self,
363+
client: &BeaconNodeHttpClient,
364+
slot: Slot,
365+
) -> Result<ScoredAttestationData, String> {
366+
let client_addr = client.to_string();
367+
368+
// Get attestation data
369+
let attestation_data = client
370+
.get_validator_attestation_data(slot, 0)
371+
.await
372+
.map_err(|e| format!("{client_addr}: {e:?}"))?
373+
.data;
374+
375+
// Calculate base score from checkpoint epochs (higher epochs = more recent)
376+
let base_score = (attestation_data.source.epoch.as_u64()
377+
+ attestation_data.target.epoch.as_u64()) as f64;
378+
379+
// Try to get head slot for bonus scoring
380+
let score = match self
381+
.get_block_slot(client, attestation_data.beacon_block_root)
382+
.await
383+
{
384+
Some(head_slot) => {
385+
// Bonus based on how close head is to attestation slot
386+
let distance = slot.as_u64().saturating_sub(head_slot.as_u64());
387+
let bonus = 1.0 / (1 + distance) as f64;
388+
389+
trace!(
390+
client = %client_addr,
391+
head_slot = head_slot.as_u64(),
392+
attestation_slot = slot.as_u64(),
393+
source_epoch = attestation_data.source.epoch.as_u64(),
394+
target_epoch = attestation_data.target.epoch.as_u64(),
395+
base_score,
396+
bonus,
397+
total_score = base_score + bonus,
398+
"Scored attestation data"
399+
);
400+
401+
base_score + bonus
402+
}
403+
None => {
404+
trace!(
405+
client = %client_addr,
406+
base_score,
407+
"Using base score only (no head slot)"
408+
);
409+
base_score
410+
}
411+
};
412+
413+
Ok(ScoredAttestationData {
414+
client_addr,
415+
attestation_data,
416+
score,
417+
})
418+
}
419+
420+
/// Get the slot number for a given block root with timeout
421+
async fn get_block_slot(
422+
&self,
423+
client: &BeaconNodeHttpClient,
424+
block_root: Hash256,
425+
) -> Option<Slot> {
426+
tokio::time::timeout(BLOCK_SLOT_LOOKUP_TIMEOUT, async {
427+
client
428+
.get_beacon_blocks::<E>(BlockId::Root(block_root))
429+
.await
430+
.ok()
431+
.flatten()
432+
.map(|resp| resp.data().slot())
433+
})
434+
.await
435+
.ok()
436+
.flatten()
437+
}
438+
}
439+
440+
#[derive(Debug, Clone)]
441+
struct ScoredAttestationData {
442+
client_addr: String,
443+
attestation_data: AttestationData,
444+
score: f64,
160445
}

0 commit comments

Comments
 (0)