diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index f026068..a475fb4 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -69,6 +69,10 @@ struct CliOptions { /// Number of attestation committees (subnets) per slot #[arg(long, default_value = "1", value_parser = clap::value_parser!(u64).range(1..))] attestation_committee_count: u64, + /// Subnet IDs this aggregator should subscribe to (comma-separated). + /// Requires --is-aggregator. Defaults to the subnets of the node's validators. + #[arg(long, value_delimiter = ',', requires = "is_aggregator")] + aggregate_subnet_ids: Option>, /// Directory for RocksDB storage #[arg(long, default_value = "./data")] data_dir: PathBuf, @@ -146,17 +150,17 @@ async fn main() -> eyre::Result<()> { .await .inspect_err(|err| error!(%err, "Failed to initialize state"))?; - // Use first validator ID for subnet subscription - let first_validator_id = validator_keys.keys().min().copied(); + let validator_ids: Vec = validator_keys.keys().copied().collect(); let blockchain = BlockChain::spawn(store.clone(), validator_keys, options.is_aggregator); let built = build_swarm(SwarmConfig { node_key: node_p2p_key, bootnodes, listening_socket: p2p_socket, - validator_id: first_validator_id, + validator_ids, attestation_committee_count: options.attestation_committee_count, is_aggregator: options.is_aggregator, + aggregate_subnet_ids: options.aggregate_subnet_ids, }) .expect("failed to build swarm"); diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 6aea70d..5d3a9b9 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -279,8 +279,7 @@ impl BlockChainServer { &mut self, signed_block: SignedBlockWithAttestation, ) -> Result<(), StoreError> { - let validator_ids = self.key_manager.validator_ids(); - store::on_block(&mut self.store, signed_block, &validator_ids)?; + store::on_block(&mut self.store, signed_block)?; metrics::update_head_slot(self.store.head_slot()); metrics::update_latest_justified_slot(self.store.latest_justified().slot); metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); @@ -454,8 +453,7 @@ impl BlockChainServer { warn!("Received unaggregated attestation but node is not an aggregator"); return; } - let validator_ids = self.key_manager.validator_ids(); - let _ = store::on_gossip_attestation(&mut self.store, attestation, &validator_ids) + let _ = store::on_gossip_attestation(&mut self.store, attestation) .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 7fb6e34..00389f0 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -26,16 +26,6 @@ use crate::{INTERVALS_PER_SLOT, MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; -/// Number of attestation committees per slot. -/// With ATTESTATION_COMMITTEE_COUNT = 1, all validators are in subnet 0. -const ATTESTATION_COMMITTEE_COUNT: u64 = 1; - -/// Compute the attestation subnet ID for a validator. -#[allow(clippy::modulo_one)] -fn compute_subnet_id(validator_id: u64) -> u64 { - validator_id % ATTESTATION_COMMITTEE_COUNT -} - /// Accept new aggregated payloads, promoting them to known for fork choice. fn accept_new_attestations(store: &mut Store, log_tree: bool) { store.promote_new_aggregated_payloads(); @@ -367,7 +357,6 @@ pub fn on_tick( pub fn on_gossip_attestation( store: &mut Store, signed_attestation: SignedAttestation, - local_validator_ids: &[u64], ) -> Result<(), StoreError> { let validator_id = signed_attestation.validator_id; let attestation = Attestation { @@ -407,16 +396,10 @@ pub fn on_gossip_attestation( // Store attestation data by root (content-addressed, idempotent) store.insert_attestation_data_by_root(data_root, attestation.data.clone()); - // Store gossip signature for later aggregation at interval 2, - // only if the attester is in the same subnet as one of our validators. - let attester_subnet = compute_subnet_id(validator_id); - let in_our_subnet = local_validator_ids - .iter() - .any(|&vid| compute_subnet_id(vid) == attester_subnet); - if in_our_subnet { - store.insert_gossip_signature(data_root, attestation.data.slot, validator_id, signature); - metrics::update_gossip_signatures(store.gossip_signatures_count()); - } + // Store gossip signature unconditionally for later aggregation at interval 2. + // Subnet filtering is handled at the P2P subscription layer. + store.insert_gossip_signature(data_root, attestation.data.slot, validator_id, signature); + metrics::update_gossip_signatures(store.gossip_signatures_count()); metrics::inc_attestations_valid(); @@ -524,9 +507,8 @@ pub fn on_gossip_aggregated_attestation( pub fn on_block( store: &mut Store, signed_block: SignedBlockWithAttestation, - local_validator_ids: &[u64], ) -> Result<(), StoreError> { - on_block_core(store, signed_block, true, local_validator_ids) + on_block_core(store, signed_block, true) } /// Process a new block without signature verification. @@ -537,7 +519,7 @@ pub fn on_block_without_verification( store: &mut Store, signed_block: SignedBlockWithAttestation, ) -> Result<(), StoreError> { - on_block_core(store, signed_block, false, &[]) + on_block_core(store, signed_block, false) } /// Core block processing logic. @@ -548,7 +530,6 @@ fn on_block_core( store: &mut Store, signed_block: SignedBlockWithAttestation, verify: bool, - local_validator_ids: &[u64], ) -> Result<(), StoreError> { let _timing = metrics::time_fork_choice_block_processing(); @@ -653,23 +634,17 @@ fn on_block_core( }; store.insert_new_aggregated_payload((proposer_vid, proposer_data_root), payload); } else { - // Store the proposer's signature for potential future block building, - // only if the proposer is in the same subnet as one of our validators. - let proposer_subnet = compute_subnet_id(proposer_vid); - let in_our_subnet = local_validator_ids - .iter() - .any(|&vid| compute_subnet_id(vid) == proposer_subnet); - if in_our_subnet { - let proposer_sig = - ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) - .map_err(|_| StoreError::SignatureDecodingFailed)?; - store.insert_gossip_signature( - proposer_data_root, - proposer_attestation.data.slot, - proposer_vid, - proposer_sig, - ); - } + // Store the proposer's signature unconditionally for future block building. + // Subnet filtering is handled at the P2P subscription layer. + let proposer_sig = + ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) + .map_err(|_| StoreError::SignatureDecodingFailed)?; + store.insert_gossip_signature( + proposer_data_root, + proposer_attestation.data.slot, + proposer_vid, + proposer_sig, + ); } info!(%slot, %block_root, %state_root, "Processed new block"); diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index 39c7987..9566a88 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -55,7 +55,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { store::on_tick(&mut st, block_time_ms, true, false); // Process the block (this includes signature verification) - let result = store::on_block(&mut st, signed_block, &[]); + let result = store::on_block(&mut st, signed_block); // Step 3: Check that it succeeded or failed as expected match (result.is_ok(), test.expect_exception.as_ref()) { diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 5b8e0ef..c48cf2d 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -9,7 +9,10 @@ use tracing::{error, info, trace}; use super::{ encoding::{compress_message, decompress_message}, - messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}, + messages::{ + AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND, + attestation_subnet_topic, + }, }; use crate::P2PServer; @@ -123,6 +126,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAttestation) { let slot = attestation.data.slot; let validator = attestation.validator_id; + let subnet_id = validator % server.attestation_committee_count; // Encode to SSZ let ssz_bytes = attestation.as_ssz_bytes(); @@ -130,13 +134,19 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte // Compress with raw snappy let compressed = compress_message(&ssz_bytes); + // Look up subscribed topic or construct on-the-fly for gossipsub fanout + let topic = server + .attestation_topics + .get(&subnet_id) + .cloned() + .unwrap_or_else(|| attestation_subnet_topic(subnet_id)); + // Publish to the attestation subnet topic - server - .swarm_handle - .publish(server.attestation_topic.clone(), compressed); + server.swarm_handle.publish(topic, compressed); info!( %slot, validator, + subnet_id, target_slot = attestation.data.target.slot, target_root = %ShortRoot(&attestation.data.target.root.0), source_slot = attestation.data.source.slot, diff --git a/crates/net/p2p/src/gossipsub/messages.rs b/crates/net/p2p/src/gossipsub/messages.rs index 2b31eb6..8b10b31 100644 --- a/crates/net/p2p/src/gossipsub/messages.rs +++ b/crates/net/p2p/src/gossipsub/messages.rs @@ -8,3 +8,13 @@ pub const ATTESTATION_SUBNET_TOPIC_PREFIX: &str = "attestation"; /// /// Full topic format: `/leanconsensus/{network}/aggregation/ssz_snappy` pub const AGGREGATION_TOPIC_KIND: &str = "aggregation"; + +// TODO: make this configurable (e.g., via GenesisConfig or CLI) +pub const NETWORK_NAME: &str = "devnet0"; + +/// Build an attestation subnet topic for the given subnet ID. +pub fn attestation_subnet_topic(subnet_id: u64) -> libp2p::gossipsub::IdentTopic { + libp2p::gossipsub::IdentTopic::new(format!( + "/leanconsensus/{NETWORK_NAME}/{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}/ssz_snappy" + )) +} diff --git a/crates/net/p2p/src/gossipsub/mod.rs b/crates/net/p2p/src/gossipsub/mod.rs index befd54c..83c3055 100644 --- a/crates/net/p2p/src/gossipsub/mod.rs +++ b/crates/net/p2p/src/gossipsub/mod.rs @@ -6,4 +6,6 @@ pub use encoding::decompress_message; pub use handler::{ handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block, }; -pub use messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}; +pub use messages::{ + AGGREGATION_TOPIC_KIND, BLOCK_TOPIC_KIND, NETWORK_NAME, attestation_subnet_topic, +}; diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index f482cd3..e74216e 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -36,7 +36,7 @@ use tracing::{info, trace, warn}; use crate::{ gossipsub::{ - AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND, + AGGREGATION_TOPIC_KIND, BLOCK_TOPIC_KIND, NETWORK_NAME, attestation_subnet_topic, publish_aggregated_attestation, publish_attestation, publish_block, }, req_resp::{ @@ -78,15 +78,17 @@ pub struct SwarmConfig { pub node_key: Vec, pub bootnodes: Vec, pub listening_socket: SocketAddr, - pub validator_id: Option, + pub validator_ids: Vec, pub attestation_committee_count: u64, pub is_aggregator: bool, + pub aggregate_subnet_ids: Option>, } /// Result of building the swarm — contains all pieces needed to start the P2P actor. pub struct BuiltSwarm { pub(crate) swarm: libp2p::Swarm, - pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic, + pub(crate) attestation_topics: HashMap, + pub(crate) attestation_committee_count: u64, pub(crate) block_topic: libp2p::gossipsub::IdentTopic, pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, pub(crate) bootnode_addrs: HashMap, @@ -184,10 +186,8 @@ pub fn build_swarm( .listen_on(addr) .expect("failed to bind gossipsub listening address"); - let network = "devnet0"; - // Subscribe to block topic (all nodes) - let block_topic_str = format!("/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy"); + let block_topic_str = format!("/leanconsensus/{NETWORK_NAME}/{BLOCK_TOPIC_KIND}/ssz_snappy"); let block_topic = libp2p::gossipsub::IdentTopic::new(block_topic_str); swarm .behaviour_mut() @@ -197,7 +197,7 @@ pub fn build_swarm( // Subscribe to aggregation topic (all validators) let aggregation_topic_str = - format!("/leanconsensus/{network}/{AGGREGATION_TOPIC_KIND}/ssz_snappy"); + format!("/leanconsensus/{NETWORK_NAME}/{AGGREGATION_TOPIC_KIND}/ssz_snappy"); let aggregation_topic = libp2p::gossipsub::IdentTopic::new(aggregation_topic_str); swarm .behaviour_mut() @@ -205,34 +205,46 @@ pub fn build_swarm( .subscribe(&aggregation_topic) .unwrap(); - // Build attestation subnet topic (needed for publishing even without subscribing) - // attestation_committee_count is validated to be >= 1 by clap at CLI parse time. - let subnet_id = config - .validator_id + // Compute the set of subnets to subscribe to. + // Validators subscribe for gossipsub mesh health; aggregators additionally + // subscribe to any explicitly requested subnets. + let validator_subnets: HashSet = config + .validator_ids + .iter() .map(|vid| vid % config.attestation_committee_count) - .unwrap_or(0); - metrics::set_attestation_committee_subnet(subnet_id); + .collect(); - let attestation_topic_kind = format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}"); - let attestation_topic_str = - format!("/leanconsensus/{network}/{attestation_topic_kind}/ssz_snappy"); - let attestation_topic = libp2p::gossipsub::IdentTopic::new(attestation_topic_str); + let mut subscribe_subnets: HashSet = validator_subnets.clone(); - // Only aggregators subscribe to attestation subnets; non-aggregators - // publish via gossipsub's fanout mechanism without subscribing. if config.is_aggregator { - swarm - .behaviour_mut() - .gossipsub - .subscribe(&attestation_topic)?; - info!(%attestation_topic_kind, "Subscribed to attestation subnet"); + if let Some(ref explicit_ids) = config.aggregate_subnet_ids { + subscribe_subnets.extend(explicit_ids); + } + // Aggregator with no validators and no explicit subnets: fallback to subnet 0 + if subscribe_subnets.is_empty() { + subscribe_subnets.insert(0); + } + } + + // Report lowest validator subnet for backward-compatible metric + let metric_subnet = validator_subnets.iter().copied().min().unwrap_or(0); + metrics::set_attestation_committee_subnet(metric_subnet); + + // Build topics and subscribe + let mut attestation_topics: HashMap = HashMap::new(); + for &subnet_id in &subscribe_subnets { + let topic = attestation_subnet_topic(subnet_id); + swarm.behaviour_mut().gossipsub.subscribe(&topic)?; + info!(subnet_id, "Subscribed to attestation subnet"); + attestation_topics.insert(subnet_id, topic); } info!(socket=%config.listening_socket, "P2P node started"); Ok(BuiltSwarm { swarm, - attestation_topic, + attestation_topics, + attestation_committee_count: config.attestation_committee_count, block_topic, aggregation_topic, bootnode_addrs, @@ -255,7 +267,8 @@ impl P2P { swarm_handle, store, blockchain: None, - attestation_topic: built.attestation_topic, + attestation_topics: built.attestation_topics, + attestation_committee_count: built.attestation_committee_count, block_topic: built.block_topic, aggregation_topic: built.aggregation_topic, connected_peers: HashSet::new(), @@ -288,7 +301,8 @@ pub struct P2PServer { // BlockChain protocol ref (set via InitBlockChain message) pub(crate) blockchain: Option, - pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic, + pub(crate) attestation_topics: HashMap, + pub(crate) attestation_committee_count: u64, pub(crate) block_topic: libp2p::gossipsub::IdentTopic, pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,