diff --git a/anchor/client/src/cli.rs b/anchor/client/src/cli.rs index a65343aee..bc3fc3062 100644 --- a/anchor/client/src/cli.rs +++ b/anchor/client/src/cli.rs @@ -532,4 +532,15 @@ pub struct Node { #[clap(flatten)] pub logging_flags: FileLoggingFlags, + + #[clap( + long, + help = "Enable parallel querying and scoring of attestation data across multiple beacon nodes. \ + When enabled, Anchor queries all configured beacon nodes simultaneously and selects \ + the attestation data with the highest score based on checkpoint epochs and head block \ + proximity. Only useful when multiple beacon nodes are configured via --beacon-nodes. \ + Disabled by default.", + display_order = 0 + )] + pub with_weighted_attestation_data: bool, } diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index 13e46d23d..945a0b5fa 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -76,6 +76,8 @@ pub struct Config { pub operator_dg: bool, /// Number of epochs to monitor for twins after grace period pub operator_dg_wait_epochs: u64, + /// Enable attestation data scoring across multiple beacon nodes + pub with_weighted_attestation_data: bool, /// Whether to check for matching checkpoint roots in QBFT. pub strict_mfp: bool, } @@ -123,6 +125,7 @@ impl Config { disable_latency_measurement_service: false, operator_dg: false, operator_dg_wait_epochs: 2, + with_weighted_attestation_data: false, strict_mfp: false, } } @@ -279,6 +282,8 @@ pub fn from_cli(cli_args: &Node, global_config: GlobalConfig) -> Result { duties_service: Arc, T>>, validator_store: Arc>, @@ -18,6 +28,7 @@ pub struct MetadataService { beacon_nodes: Arc>, executor: TaskExecutor, spec: Arc, + weighted_attestation_data: bool, } impl MetadataService { @@ -28,6 +39,7 @@ impl MetadataService { beacon_nodes: Arc>, executor: TaskExecutor, spec: Arc, + weighted_attestation_data: bool, ) -> Self { Self { duties_service, @@ -36,6 +48,7 @@ impl MetadataService { beacon_nodes, executor, spec, + weighted_attestation_data, } } @@ -78,21 +91,24 @@ impl MetadataService { async fn update_metadata(&self) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; - let attestation_data = self - .beacon_nodes - .first_success(|beacon_node| async move { - let _timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::ATTESTATIONS_HTTP_GET], - ); - beacon_node - .get_validator_attestation_data(slot, 0) - .await - .map_err(|e| format!("Failed to produce attestation data: {e:?}")) - .map(|result| result.data) - }) - .await - .map_err(|e| e.to_string())?; + let attestation_data = if self.weighted_attestation_data { + self.weighted_calculation(slot).await? + } else { + self.beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_GET], + ); + beacon_node + .get_validator_attestation_data(slot, 0) + .await + .map_err(|e| format!("Failed to produce attestation data: {e:?}")) + .map(|result| result.data) + }) + .await + .map_err(|e| e.to_string())? + }; let beacon_vote = BeaconVote { block_root: attestation_data.beacon_block_root, @@ -157,4 +173,290 @@ impl MetadataService { Ok(()) } + + async fn weighted_calculation(&self, slot: Slot) -> Result { + let started = Instant::now(); + + let clients: Vec<(String, BeaconNodeHttpClient)> = { + let candidates = self.beacon_nodes.candidates.read().await; + candidates + .iter() + .map(|c| (c.beacon_node.to_string(), c.beacon_node.clone())) + .collect() + }; + + let num_clients = clients.len(); + + debug!(num_clients, "Starting weighted attestation data fetch"); + + // Spawn all fetch requests in parallel + let mut futures: FuturesUnordered<_> = clients + .into_iter() + .map(|(addr, client)| async move { + let result = self.fetch_and_score(&client, slot).await; + (addr, result) + }) + .collect(); + + let mut succeeded = 0; + let mut failed = 0; + let mut best_data: Option = None; + let mut soft_timeout_hit = false; + + // We have two timeouts: a soft timeout and a hard timeout. + // At the soft timeout, we return if we have any responses so far. + // At the hard timeout, we return unconditionally. + // The soft timeout is half the duration of the hard timeout. + + // Collect responses until soft timeout (500ms) + let soft_timeout = sleep(SOFT_TIMEOUT); + tokio::pin!(soft_timeout); + + loop { + tokio::select! { + biased; + + Some((addr, result)) = futures.next() => { + match result { + Ok(scored_attestation) => { + succeeded += 1; + trace!( + elapsed_ms = started.elapsed().as_millis(), + client = %scored_attestation.client_addr, + score = scored_attestation.score, + succeeded, + failed, + "Attestation data received" + ); + + // Update best if this score is higher + best_data = Some(match best_data { + Some(current) if current.score >= scored_attestation.score => current, + _ => { + debug!( + client = %scored_attestation.client_addr, + score = scored_attestation.score, + "New best attestation data" + ); + scored_attestation + } + }); + } + Err(e) => { + failed += 1; + warn!( + elapsed_ms = started.elapsed().as_millis(), + client = %addr, + error = %e, + succeeded, + failed, + "Failed to fetch attestation data" + ); + } + } + + // All responses received, exit early + if succeeded + failed == num_clients { + break; + } + } + + () = &mut soft_timeout, if !soft_timeout_hit => { + soft_timeout_hit = true; + debug!( + elapsed_ms = started.elapsed().as_millis(), + succeeded, + failed, + pending = num_clients - succeeded - failed, + "Soft timeout reached" + ); + + // If we have at least one response, return early + if best_data.is_some() { + break; + } + } + + else => break, + } + } + + // If no responses yet, wait until hard timeout (1s) + if best_data.is_none() && succeeded + failed < num_clients { + let remaining = HARD_TIMEOUT.saturating_sub(started.elapsed()); + let hard_timeout = sleep(remaining); + tokio::pin!(hard_timeout); + + loop { + tokio::select! { + biased; + + Some((addr, result)) = futures.next() => { + match result { + Ok(scored_attestation) => { + succeeded += 1; + trace!( + elapsed_ms = started.elapsed().as_millis(), + client = %scored_attestation.client_addr, + score = scored_attestation.score, + "Response received (hard timeout phase)" + ); + + best_data = Some(match best_data { + Some(current) if current.score >= scored_attestation.score => current, + _ => scored_attestation, + }); + } + Err(e) => { + failed += 1; + warn!( + client = %addr, + error = %e, + "Error in hard timeout phase" + ); + } + } + + if succeeded + failed == num_clients { + break; + } + } + + () = &mut hard_timeout => { + error!( + elapsed_ms = started.elapsed().as_millis(), + succeeded, + failed, + timed_out = num_clients - succeeded - failed, + "Hard timeout reached" + ); + break; + } + + else => break, + } + } + } + + // Return best result or error if none received + match best_data { + Some(scored_attestation) => { + debug!( + elapsed_ms = started.elapsed().as_millis(), + client = %scored_attestation.client_addr, + score = scored_attestation.score, + succeeded, + failed, + "Selected best attestation data" + ); + Ok(scored_attestation.attestation_data) + } + None => Err(format!( + "No attestation data received from any of {} beacon nodes (succeeded: {}, failed: {})", + num_clients, succeeded, failed + )), + } + } + + async fn fetch_and_score( + &self, + client: &BeaconNodeHttpClient, + slot: Slot, + ) -> Result { + let client_addr = client.to_string(); + + // Get attestation data + let attestation_data = client + .get_validator_attestation_data(slot, 0) + .await + .map_err(|e| format!("{client_addr}: {e:?}"))? + .data; + + // Calculate base score from checkpoint epochs (higher epochs = more recent) + let base_score = (attestation_data.source.epoch.as_u64() + + attestation_data.target.epoch.as_u64()) as f64; + + // Try to get head slot for bonus scoring + let score = match self + .get_block_slot(client, attestation_data.beacon_block_root) + .await + { + Some(head_slot) => { + let attestation_slot_u64 = slot.as_u64(); + let head_slot_u64 = head_slot.as_u64(); + + if head_slot_u64 <= attestation_slot_u64 { + // Increase score based on the nearness of the head slot + let distance = attestation_slot_u64 - head_slot_u64; + let bonus = 1.0 / (1 + distance) as f64; + + trace!( + client = %client_addr, + head_slot = head_slot_u64, + attestation_slot = attestation_slot_u64, + source_epoch = attestation_data.source.epoch.as_u64(), + target_epoch = attestation_data.target.epoch.as_u64(), + distance, + base_score, + bonus, + total_score = base_score + bonus, + "Scored attestation data" + ); + + base_score + bonus + } else { + warn!( + client = %client_addr, + head_slot = head_slot_u64, + attestation_slot = attestation_slot_u64, + "Block slot is the same or after attestation slot, skipping proximity bonus" + ); + base_score + } + } + None => { + trace!( + client = %client_addr, + base_score, + "Using base score only (no head slot)" + ); + base_score + } + }; + + Ok(ScoredAttestationData { + client_addr, + attestation_data, + score, + }) + } + + /// Get the slot number for a given block root with timeout + async fn get_block_slot( + &self, + client: &BeaconNodeHttpClient, + block_root: Hash256, + ) -> Option { + tokio::time::timeout(BLOCK_SLOT_LOOKUP_TIMEOUT, async { + client + .get_beacon_blocks::(BlockId::Root(block_root)) + .await + .ok() + .flatten() + .map(|resp| resp.data().slot()) + }) + .await + .ok() + .flatten() + } +} + +#[derive(Debug, Clone)] +struct ScoredAttestationData { + client_addr: String, + attestation_data: AttestationData, + score: f64, } + +#[cfg(test)] +mod tests; diff --git a/anchor/validator_store/src/metadata_service/tests.rs b/anchor/validator_store/src/metadata_service/tests.rs new file mode 100644 index 000000000..b973a8d1b --- /dev/null +++ b/anchor/validator_store/src/metadata_service/tests.rs @@ -0,0 +1,150 @@ +use types::{AttestationData, Checkpoint, Epoch, FixedBytesExtended, Hash256, Slot}; + +/// Calculate the score for attestation data based on checkpoint epochs and head slot proximity. +/// Extracted from fetch_and_score for testing. +fn calculate_attestation_score( + attestation_data: &AttestationData, + attestation_slot: Slot, + head_slot: Option, +) -> f64 { + let base_score = + (attestation_data.source.epoch.as_u64() + attestation_data.target.epoch.as_u64()) as f64; + + match head_slot { + Some(head_slot) => { + let attestation_slot_u64 = attestation_slot.as_u64(); + let head_slot_u64 = head_slot.as_u64(); + + if head_slot_u64 <= attestation_slot_u64 { + let distance = attestation_slot_u64 - head_slot_u64; + let bonus = 1.0 / (1 + distance) as f64; + base_score + bonus + } else { + base_score + } + } + None => base_score, + } +} + +fn create_test_attestation_data(source_epoch: u64, target_epoch: u64) -> AttestationData { + AttestationData { + slot: Slot::new(0), + index: 0, + beacon_block_root: Hash256::zero(), + source: Checkpoint { + epoch: Epoch::new(source_epoch), + root: Hash256::zero(), + }, + target: Checkpoint { + epoch: Epoch::new(target_epoch), + root: Hash256::zero(), + }, + } +} + +#[test] +fn test_scoring_higher_epochs_win() { + let newer = create_test_attestation_data(100, 101); + let older = create_test_attestation_data(99, 100); + + let slot = Slot::new(3232); + let head = Some(Slot::new(3230)); + + let newer_score = calculate_attestation_score(&newer, slot, head); + let older_score = calculate_attestation_score(&older, slot, head); + + assert!( + newer_score > older_score, + "Newer epochs should score higher. newer: {}, older: {}", + newer_score, + older_score + ); +} + +#[test] +fn test_scoring_proximity_bonus() { + let data = create_test_attestation_data(100, 101); + let attestation_slot = Slot::new(3232); + + // Distance 0 (same slot) should get maximum bonus (1.0) + let distance_zero = Some(Slot::new(3232)); + let score_zero = calculate_attestation_score(&data, attestation_slot, distance_zero); + + // Distance 1 should get smaller bonus (0.5) + let distance_one = Some(Slot::new(3231)); + let score_one = calculate_attestation_score(&data, attestation_slot, distance_one); + + assert!( + score_zero > score_one, + "Distance 0 should score higher than distance 1. distance_0: {}, distance_1: {}", + score_zero, + score_one + ); + + // Verify the actual bonus values + let base_score = 201.0; // 100 + 101 + assert_eq!( + score_zero, + base_score + 1.0, + "Distance 0 should give bonus of 1.0" + ); + assert_eq!( + score_one, + base_score + 0.5, + "Distance 1 should give bonus of 0.5" + ); +} + +#[test] +fn test_scoring_no_head_slot() { + let data = create_test_attestation_data(100, 101); + let attestation_slot = Slot::new(3232); + + let score = calculate_attestation_score(&data, attestation_slot, None); + + // Without head slot, should only get base score (no bonus) + let expected_base_score = 201.0; // 100 + 101 + assert_eq!( + score, expected_base_score, + "No head slot should give base score only" + ); +} + +#[test] +fn test_scoring_head_after_attestation_slot() { + let data = create_test_attestation_data(100, 101); + let attestation_slot = Slot::new(3230); + let future_head = Some(Slot::new(3232)); // Head is after attestation + + let score = calculate_attestation_score(&data, attestation_slot, future_head); + + // When head is after attestation slot, should not give bonus + let expected_base_score = 201.0; + assert_eq!( + score, expected_base_score, + "Future head slot should not give bonus" + ); +} + +#[test] +fn test_scoring_same_base_different_proximity() { + let data = create_test_attestation_data(100, 101); + let attestation_slot = Slot::new(3232); + + // Different head slots with same base score + let head1 = Some(Slot::new(3231)); // distance 1 + let head2 = Some(Slot::new(3230)); // distance 2 + + let score1 = calculate_attestation_score(&data, attestation_slot, head1); + let score2 = calculate_attestation_score(&data, attestation_slot, head2); + + // Same base, but different proximity bonuses + let base = 201.0; + assert_eq!(score1, base + 0.5, "Distance 1 should give bonus of 0.5"); + assert_eq!( + score2, + base + 1.0 / 3.0, + "Distance 2 should give bonus of 1/3" + ); +}