Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ struct CliOptions {
/// Number of attestation committees (subnets) per slot
#[arg(long, default_value = "1", value_parser = clap::value_parser!(u64).range(1..))]
attestation_committee_count: u64,
/// Subnet IDs this aggregator should subscribe to (comma-separated).
/// Requires --is-aggregator. Defaults to the subnets of the node's validators.
#[arg(long, value_delimiter = ',', requires = "is_aggregator")]
aggregate_subnet_ids: Option<Vec<u64>>,
/// Directory for RocksDB storage
#[arg(long, default_value = "./data")]
data_dir: PathBuf,
Expand Down Expand Up @@ -146,17 +150,17 @@ async fn main() -> eyre::Result<()> {
.await
.inspect_err(|err| error!(%err, "Failed to initialize state"))?;

// Use first validator ID for subnet subscription
let first_validator_id = validator_keys.keys().min().copied();
let validator_ids: Vec<u64> = validator_keys.keys().copied().collect();
let blockchain = BlockChain::spawn(store.clone(), validator_keys, options.is_aggregator);

let built = build_swarm(SwarmConfig {
node_key: node_p2p_key,
bootnodes,
listening_socket: p2p_socket,
validator_id: first_validator_id,
validator_ids,
attestation_committee_count: options.attestation_committee_count,
is_aggregator: options.is_aggregator,
aggregate_subnet_ids: options.aggregate_subnet_ids,
})
.expect("failed to build swarm");

Expand Down
6 changes: 2 additions & 4 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,7 @@ impl BlockChainServer {
&mut self,
signed_block: SignedBlockWithAttestation,
) -> Result<(), StoreError> {
let validator_ids = self.key_manager.validator_ids();
store::on_block(&mut self.store, signed_block, &validator_ids)?;
store::on_block(&mut self.store, signed_block)?;
metrics::update_head_slot(self.store.head_slot());
metrics::update_latest_justified_slot(self.store.latest_justified().slot);
metrics::update_latest_finalized_slot(self.store.latest_finalized().slot);
Expand Down Expand Up @@ -454,8 +453,7 @@ impl BlockChainServer {
warn!("Received unaggregated attestation but node is not an aggregator");
return;
}
let validator_ids = self.key_manager.validator_ids();
let _ = store::on_gossip_attestation(&mut self.store, attestation, &validator_ids)
let _ = store::on_gossip_attestation(&mut self.store, attestation)
.inspect_err(|err| warn!(%err, "Failed to process gossiped attestation"));
}

Expand Down
59 changes: 17 additions & 42 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,6 @@ use crate::{INTERVALS_PER_SLOT, MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT

const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3;

/// Number of attestation committees per slot.
/// With ATTESTATION_COMMITTEE_COUNT = 1, all validators are in subnet 0.
const ATTESTATION_COMMITTEE_COUNT: u64 = 1;

/// Compute the attestation subnet ID for a validator.
#[allow(clippy::modulo_one)]
fn compute_subnet_id(validator_id: u64) -> u64 {
validator_id % ATTESTATION_COMMITTEE_COUNT
}

/// Accept new aggregated payloads, promoting them to known for fork choice.
fn accept_new_attestations(store: &mut Store, log_tree: bool) {
store.promote_new_aggregated_payloads();
Expand Down Expand Up @@ -367,7 +357,6 @@ pub fn on_tick(
pub fn on_gossip_attestation(
store: &mut Store,
signed_attestation: SignedAttestation,
local_validator_ids: &[u64],
) -> Result<(), StoreError> {
let validator_id = signed_attestation.validator_id;
let attestation = Attestation {
Expand Down Expand Up @@ -407,16 +396,10 @@ pub fn on_gossip_attestation(
// Store attestation data by root (content-addressed, idempotent)
store.insert_attestation_data_by_root(data_root, attestation.data.clone());

// Store gossip signature for later aggregation at interval 2,
// only if the attester is in the same subnet as one of our validators.
let attester_subnet = compute_subnet_id(validator_id);
let in_our_subnet = local_validator_ids
.iter()
.any(|&vid| compute_subnet_id(vid) == attester_subnet);
if in_our_subnet {
store.insert_gossip_signature(data_root, attestation.data.slot, validator_id, signature);
metrics::update_gossip_signatures(store.gossip_signatures_count());
}
// Store gossip signature unconditionally for later aggregation at interval 2.
// Subnet filtering is handled at the P2P subscription layer.
store.insert_gossip_signature(data_root, attestation.data.slot, validator_id, signature);
metrics::update_gossip_signatures(store.gossip_signatures_count());

metrics::inc_attestations_valid();

Expand Down Expand Up @@ -524,9 +507,8 @@ pub fn on_gossip_aggregated_attestation(
pub fn on_block(
store: &mut Store,
signed_block: SignedBlockWithAttestation,
local_validator_ids: &[u64],
) -> Result<(), StoreError> {
on_block_core(store, signed_block, true, local_validator_ids)
on_block_core(store, signed_block, true)
}

/// Process a new block without signature verification.
Expand All @@ -537,7 +519,7 @@ pub fn on_block_without_verification(
store: &mut Store,
signed_block: SignedBlockWithAttestation,
) -> Result<(), StoreError> {
on_block_core(store, signed_block, false, &[])
on_block_core(store, signed_block, false)
}

/// Core block processing logic.
Expand All @@ -548,7 +530,6 @@ fn on_block_core(
store: &mut Store,
signed_block: SignedBlockWithAttestation,
verify: bool,
local_validator_ids: &[u64],
) -> Result<(), StoreError> {
let _timing = metrics::time_fork_choice_block_processing();

Expand Down Expand Up @@ -653,23 +634,17 @@ fn on_block_core(
};
store.insert_new_aggregated_payload((proposer_vid, proposer_data_root), payload);
} else {
// Store the proposer's signature for potential future block building,
// only if the proposer is in the same subnet as one of our validators.
let proposer_subnet = compute_subnet_id(proposer_vid);
let in_our_subnet = local_validator_ids
.iter()
.any(|&vid| compute_subnet_id(vid) == proposer_subnet);
if in_our_subnet {
let proposer_sig =
ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature)
.map_err(|_| StoreError::SignatureDecodingFailed)?;
store.insert_gossip_signature(
proposer_data_root,
proposer_attestation.data.slot,
proposer_vid,
proposer_sig,
);
}
// Store the proposer's signature unconditionally for future block building.
// Subnet filtering is handled at the P2P subscription layer.
let proposer_sig =
ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature)
.map_err(|_| StoreError::SignatureDecodingFailed)?;
store.insert_gossip_signature(
proposer_data_root,
proposer_attestation.data.slot,
proposer_vid,
proposer_sig,
);
}

info!(%slot, %block_root, %state_root, "Processed new block");
Expand Down
2 changes: 1 addition & 1 deletion crates/blockchain/tests/signature_spectests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> {
store::on_tick(&mut st, block_time_ms, true, false);

// Process the block (this includes signature verification)
let result = store::on_block(&mut st, signed_block, &[]);
let result = store::on_block(&mut st, signed_block);

// Step 3: Check that it succeeded or failed as expected
match (result.is_ok(), test.expect_exception.as_ref()) {
Expand Down
18 changes: 14 additions & 4 deletions crates/net/p2p/src/gossipsub/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use tracing::{error, info, trace};

use super::{
encoding::{compress_message, decompress_message},
messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND},
messages::{
AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND,
attestation_subnet_topic,
},
};
use crate::P2PServer;

Expand Down Expand Up @@ -123,20 +126,27 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAttestation) {
let slot = attestation.data.slot;
let validator = attestation.validator_id;
let subnet_id = validator % server.attestation_committee_count;

// Encode to SSZ
let ssz_bytes = attestation.as_ssz_bytes();

// Compress with raw snappy
let compressed = compress_message(&ssz_bytes);

// Look up subscribed topic or construct on-the-fly for gossipsub fanout
let topic = server
.attestation_topics
.get(&subnet_id)
.cloned()
.unwrap_or_else(|| attestation_subnet_topic(subnet_id));

// Publish to the attestation subnet topic
server
.swarm_handle
.publish(server.attestation_topic.clone(), compressed);
server.swarm_handle.publish(topic, compressed);
info!(
%slot,
validator,
subnet_id,
target_slot = attestation.data.target.slot,
target_root = %ShortRoot(&attestation.data.target.root.0),
source_slot = attestation.data.source.slot,
Expand Down
10 changes: 10 additions & 0 deletions crates/net/p2p/src/gossipsub/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@ pub const ATTESTATION_SUBNET_TOPIC_PREFIX: &str = "attestation";
///
/// Full topic format: `/leanconsensus/{network}/aggregation/ssz_snappy`
pub const AGGREGATION_TOPIC_KIND: &str = "aggregation";

// TODO: make this configurable (e.g., via GenesisConfig or CLI)
pub const NETWORK_NAME: &str = "devnet0";

/// Build an attestation subnet topic for the given subnet ID.
pub fn attestation_subnet_topic(subnet_id: u64) -> libp2p::gossipsub::IdentTopic {
libp2p::gossipsub::IdentTopic::new(format!(
"/leanconsensus/{NETWORK_NAME}/{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}/ssz_snappy"
))
}
4 changes: 3 additions & 1 deletion crates/net/p2p/src/gossipsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ pub use encoding::decompress_message;
pub use handler::{
handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block,
};
pub use messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND};
pub use messages::{
AGGREGATION_TOPIC_KIND, BLOCK_TOPIC_KIND, NETWORK_NAME, attestation_subnet_topic,
};
68 changes: 41 additions & 27 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tracing::{info, trace, warn};

use crate::{
gossipsub::{
AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND,
AGGREGATION_TOPIC_KIND, BLOCK_TOPIC_KIND, NETWORK_NAME, attestation_subnet_topic,
publish_aggregated_attestation, publish_attestation, publish_block,
},
req_resp::{
Expand Down Expand Up @@ -78,15 +78,17 @@ pub struct SwarmConfig {
pub node_key: Vec<u8>,
pub bootnodes: Vec<Bootnode>,
pub listening_socket: SocketAddr,
pub validator_id: Option<u64>,
pub validator_ids: Vec<u64>,
pub attestation_committee_count: u64,
pub is_aggregator: bool,
pub aggregate_subnet_ids: Option<Vec<u64>>,
}

/// Result of building the swarm — contains all pieces needed to start the P2P actor.
pub struct BuiltSwarm {
pub(crate) swarm: libp2p::Swarm<Behaviour>,
pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic,
pub(crate) attestation_topics: HashMap<u64, libp2p::gossipsub::IdentTopic>,
pub(crate) attestation_committee_count: u64,
pub(crate) block_topic: libp2p::gossipsub::IdentTopic,
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,
pub(crate) bootnode_addrs: HashMap<PeerId, Multiaddr>,
Expand Down Expand Up @@ -184,10 +186,8 @@ pub fn build_swarm(
.listen_on(addr)
.expect("failed to bind gossipsub listening address");

let network = "devnet0";

// Subscribe to block topic (all nodes)
let block_topic_str = format!("/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy");
let block_topic_str = format!("/leanconsensus/{NETWORK_NAME}/{BLOCK_TOPIC_KIND}/ssz_snappy");
let block_topic = libp2p::gossipsub::IdentTopic::new(block_topic_str);
swarm
.behaviour_mut()
Expand All @@ -197,42 +197,54 @@ pub fn build_swarm(

// Subscribe to aggregation topic (all validators)
let aggregation_topic_str =
format!("/leanconsensus/{network}/{AGGREGATION_TOPIC_KIND}/ssz_snappy");
format!("/leanconsensus/{NETWORK_NAME}/{AGGREGATION_TOPIC_KIND}/ssz_snappy");
let aggregation_topic = libp2p::gossipsub::IdentTopic::new(aggregation_topic_str);
swarm
.behaviour_mut()
.gossipsub
.subscribe(&aggregation_topic)
.unwrap();

// Build attestation subnet topic (needed for publishing even without subscribing)
// attestation_committee_count is validated to be >= 1 by clap at CLI parse time.
let subnet_id = config
.validator_id
// Compute the set of subnets to subscribe to.
// Validators subscribe for gossipsub mesh health; aggregators additionally
// subscribe to any explicitly requested subnets.
let validator_subnets: HashSet<u64> = config
.validator_ids
.iter()
.map(|vid| vid % config.attestation_committee_count)
.unwrap_or(0);
metrics::set_attestation_committee_subnet(subnet_id);
.collect();

let attestation_topic_kind = format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}");
let attestation_topic_str =
format!("/leanconsensus/{network}/{attestation_topic_kind}/ssz_snappy");
let attestation_topic = libp2p::gossipsub::IdentTopic::new(attestation_topic_str);
let mut subscribe_subnets: HashSet<u64> = validator_subnets.clone();

// Only aggregators subscribe to attestation subnets; non-aggregators
// publish via gossipsub's fanout mechanism without subscribing.
if config.is_aggregator {
swarm
.behaviour_mut()
.gossipsub
.subscribe(&attestation_topic)?;
info!(%attestation_topic_kind, "Subscribed to attestation subnet");
if let Some(ref explicit_ids) = config.aggregate_subnet_ids {
subscribe_subnets.extend(explicit_ids);
}
// Aggregator with no validators and no explicit subnets: fallback to subnet 0
if subscribe_subnets.is_empty() {
subscribe_subnets.insert(0);
}
Comment on lines 219 to +226
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No bounds-check on explicit aggregate_subnet_ids

The --aggregate-subnet-ids values provided via CLI are added directly to subscribe_subnets without being validated against attestation_committee_count. An operator running e.g. --attestation-committee-count 2 --aggregate-subnet-ids 0,1,5 will silently subscribe to subnet 5, which falls outside the valid range [0, committee_count). This could cause the node to subscribe to a topic that no other peer publishes to, wasting a gossipsub mesh slot, and creates a mismatch between the node's subscriptions and the routing logic in publish_attestation (which computes validator % committee_count).

Consider adding a validation step here:

if config.is_aggregator {
    if let Some(ref explicit_ids) = config.aggregate_subnet_ids {
        for &id in explicit_ids {
            if id >= config.attestation_committee_count {
                // return an error or log a warning
                warn!(subnet_id = id, committee_count = config.attestation_committee_count,
                      "aggregate_subnet_ids contains out-of-range subnet ID");
            }
        }
        subscribe_subnets.extend(explicit_ids);
    }
    ...
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/lib.rs
Line: 221-228

Comment:
**No bounds-check on explicit `aggregate_subnet_ids`**

The `--aggregate-subnet-ids` values provided via CLI are added directly to `subscribe_subnets` without being validated against `attestation_committee_count`. An operator running e.g. `--attestation-committee-count 2 --aggregate-subnet-ids 0,1,5` will silently subscribe to subnet 5, which falls outside the valid range `[0, committee_count)`. This could cause the node to subscribe to a topic that no other peer publishes to, wasting a gossipsub mesh slot, and creates a mismatch between the node's subscriptions and the routing logic in `publish_attestation` (which computes `validator % committee_count`).

Consider adding a validation step here:

```rust
if config.is_aggregator {
    if let Some(ref explicit_ids) = config.aggregate_subnet_ids {
        for &id in explicit_ids {
            if id >= config.attestation_committee_count {
                // return an error or log a warning
                warn!(subnet_id = id, committee_count = config.attestation_committee_count,
                      "aggregate_subnet_ids contains out-of-range subnet ID");
            }
        }
        subscribe_subnets.extend(explicit_ids);
    }
    ...
}
```

How can I resolve this? If you propose a fix, please make it concise.

}

// Report lowest validator subnet for backward-compatible metric
let metric_subnet = validator_subnets.iter().copied().min().unwrap_or(0);
metrics::set_attestation_committee_subnet(metric_subnet);

// Build topics and subscribe
let mut attestation_topics: HashMap<u64, libp2p::gossipsub::IdentTopic> = HashMap::new();
for &subnet_id in &subscribe_subnets {
let topic = attestation_subnet_topic(subnet_id);
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
info!(subnet_id, "Subscribed to attestation subnet");
attestation_topics.insert(subnet_id, topic);
}

info!(socket=%config.listening_socket, "P2P node started");

Ok(BuiltSwarm {
swarm,
attestation_topic,
attestation_topics,
attestation_committee_count: config.attestation_committee_count,
block_topic,
aggregation_topic,
bootnode_addrs,
Expand All @@ -255,7 +267,8 @@ impl P2P {
swarm_handle,
store,
blockchain: None,
attestation_topic: built.attestation_topic,
attestation_topics: built.attestation_topics,
attestation_committee_count: built.attestation_committee_count,
block_topic: built.block_topic,
aggregation_topic: built.aggregation_topic,
connected_peers: HashSet::new(),
Expand Down Expand Up @@ -288,7 +301,8 @@ pub struct P2PServer {
// BlockChain protocol ref (set via InitBlockChain message)
pub(crate) blockchain: Option<P2PToBlockChainRef>,

pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic,
pub(crate) attestation_topics: HashMap<u64, libp2p::gossipsub::IdentTopic>,
pub(crate) attestation_committee_count: u64,
pub(crate) block_topic: libp2p::gossipsub::IdentTopic,
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,

Expand Down
Loading