From 15a1ca7d1e1bafdc2d43ff70160b369ddcbf1b5d Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 25 Mar 2026 18:13:20 -0300 Subject: [PATCH 1/3] Port leanSpec PR #482: subnet filtering moves from store to P2P layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add --aggregate-subnet-ids CLI flag and multi-subnet subscription support. Nodes now subscribe to all subnets their validators belong to (for mesh health), and aggregators can additionally subscribe to explicit subnets. The store no longer filters attestations by subnet — it stores all gossip signatures unconditionally, since the P2P layer already ensures only relevant subnets are subscribed. Key changes: - SwarmConfig accepts all validator IDs instead of just the first - P2P layer computes subnet set from validators + explicit aggregator IDs - publish_attestation routes per-validator to correct subnet topic - Store removes ATTESTATION_COMMITTEE_COUNT constant and compute_subnet_id - on_gossip_attestation and on_block no longer take local_validator_ids --- bin/ethlambda/src/main.rs | 10 ++- crates/blockchain/src/lib.rs | 6 +- crates/blockchain/src/store.rs | 59 +++++------------- .../blockchain/tests/signature_spectests.rs | 2 +- crates/net/p2p/src/gossipsub/handler.rs | 19 +++++- crates/net/p2p/src/lib.rs | 62 ++++++++++++------- 6 files changed, 83 insertions(+), 75 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index f0260688..a475fb49 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -69,6 +69,10 @@ struct CliOptions { /// Number of attestation committees (subnets) per slot #[arg(long, default_value = "1", value_parser = clap::value_parser!(u64).range(1..))] attestation_committee_count: u64, + /// Subnet IDs this aggregator should subscribe to (comma-separated). + /// Requires --is-aggregator. Defaults to the subnets of the node's validators. + #[arg(long, value_delimiter = ',', requires = "is_aggregator")] + aggregate_subnet_ids: Option>, /// Directory for RocksDB storage #[arg(long, default_value = "./data")] data_dir: PathBuf, @@ -146,17 +150,17 @@ async fn main() -> eyre::Result<()> { .await .inspect_err(|err| error!(%err, "Failed to initialize state"))?; - // Use first validator ID for subnet subscription - let first_validator_id = validator_keys.keys().min().copied(); + let validator_ids: Vec = validator_keys.keys().copied().collect(); let blockchain = BlockChain::spawn(store.clone(), validator_keys, options.is_aggregator); let built = build_swarm(SwarmConfig { node_key: node_p2p_key, bootnodes, listening_socket: p2p_socket, - validator_id: first_validator_id, + validator_ids, attestation_committee_count: options.attestation_committee_count, is_aggregator: options.is_aggregator, + aggregate_subnet_ids: options.aggregate_subnet_ids, }) .expect("failed to build swarm"); diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index d9652455..99b87e30 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -279,8 +279,7 @@ impl BlockChainServer { &mut self, signed_block: SignedBlockWithAttestation, ) -> Result<(), StoreError> { - let validator_ids = self.key_manager.validator_ids(); - store::on_block(&mut self.store, signed_block, &validator_ids)?; + store::on_block(&mut self.store, signed_block)?; metrics::update_head_slot(self.store.head_slot()); metrics::update_latest_justified_slot(self.store.latest_justified().slot); metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); @@ -454,8 +453,7 @@ impl BlockChainServer { warn!("Received unaggregated attestation but node is not an aggregator"); return; } - let validator_ids = self.key_manager.validator_ids(); - let _ = store::on_gossip_attestation(&mut self.store, attestation, &validator_ids) + let _ = store::on_gossip_attestation(&mut self.store, attestation) .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 03bd4f08..0708c781 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -26,16 +26,6 @@ use crate::{INTERVALS_PER_SLOT, MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; -/// Number of attestation committees per slot. -/// With ATTESTATION_COMMITTEE_COUNT = 1, all validators are in subnet 0. -const ATTESTATION_COMMITTEE_COUNT: u64 = 1; - -/// Compute the attestation subnet ID for a validator. -#[allow(clippy::modulo_one)] -fn compute_subnet_id(validator_id: u64) -> u64 { - validator_id % ATTESTATION_COMMITTEE_COUNT -} - /// Accept new aggregated payloads, promoting them to known for fork choice. fn accept_new_attestations(store: &mut Store, log_tree: bool) { store.promote_new_aggregated_payloads(); @@ -367,7 +357,6 @@ pub fn on_tick( pub fn on_gossip_attestation( store: &mut Store, signed_attestation: SignedAttestation, - local_validator_ids: &[u64], ) -> Result<(), StoreError> { let validator_id = signed_attestation.validator_id; let attestation = Attestation { @@ -407,16 +396,10 @@ pub fn on_gossip_attestation( // Store attestation data by root (content-addressed, idempotent) store.insert_attestation_data_by_root(data_root, attestation.data.clone()); - // Store gossip signature for later aggregation at interval 2, - // only if the attester is in the same subnet as one of our validators. - let attester_subnet = compute_subnet_id(validator_id); - let in_our_subnet = local_validator_ids - .iter() - .any(|&vid| compute_subnet_id(vid) == attester_subnet); - if in_our_subnet { - store.insert_gossip_signature(data_root, attestation.data.slot, validator_id, signature); - metrics::update_gossip_signatures(store.gossip_signatures_count()); - } + // Store gossip signature unconditionally for later aggregation at interval 2. + // Subnet filtering is handled at the P2P subscription layer. + store.insert_gossip_signature(data_root, attestation.data.slot, validator_id, signature); + metrics::update_gossip_signatures(store.gossip_signatures_count()); metrics::inc_attestations_valid(); @@ -524,9 +507,8 @@ pub fn on_gossip_aggregated_attestation( pub fn on_block( store: &mut Store, signed_block: SignedBlockWithAttestation, - local_validator_ids: &[u64], ) -> Result<(), StoreError> { - on_block_core(store, signed_block, true, local_validator_ids) + on_block_core(store, signed_block, true) } /// Process a new block without signature verification. @@ -537,7 +519,7 @@ pub fn on_block_without_verification( store: &mut Store, signed_block: SignedBlockWithAttestation, ) -> Result<(), StoreError> { - on_block_core(store, signed_block, false, &[]) + on_block_core(store, signed_block, false) } /// Core block processing logic. @@ -548,7 +530,6 @@ fn on_block_core( store: &mut Store, signed_block: SignedBlockWithAttestation, verify: bool, - local_validator_ids: &[u64], ) -> Result<(), StoreError> { let _timing = metrics::time_fork_choice_block_processing(); @@ -653,23 +634,17 @@ fn on_block_core( }; store.insert_new_aggregated_payload((proposer_vid, proposer_data_root), payload); } else { - // Store the proposer's signature for potential future block building, - // only if the proposer is in the same subnet as one of our validators. - let proposer_subnet = compute_subnet_id(proposer_vid); - let in_our_subnet = local_validator_ids - .iter() - .any(|&vid| compute_subnet_id(vid) == proposer_subnet); - if in_our_subnet { - let proposer_sig = - ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) - .map_err(|_| StoreError::SignatureDecodingFailed)?; - store.insert_gossip_signature( - proposer_data_root, - proposer_attestation.data.slot, - proposer_vid, - proposer_sig, - ); - } + // Store the proposer's signature unconditionally for future block building. + // Subnet filtering is handled at the P2P subscription layer. + let proposer_sig = + ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) + .map_err(|_| StoreError::SignatureDecodingFailed)?; + store.insert_gossip_signature( + proposer_data_root, + proposer_attestation.data.slot, + proposer_vid, + proposer_sig, + ); } info!(%slot, %block_root, %state_root, "Processed new block"); diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index c35c9ebe..5d617e33 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -55,7 +55,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { store::on_tick(&mut st, block_time_ms, true, false); // Process the block (this includes signature verification) - let result = store::on_block(&mut st, signed_block, &[]); + let result = store::on_block(&mut st, signed_block); // Step 3: Check that it succeeded or failed as expected match (result.is_ok(), test.expect_exception.as_ref()) { diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 1406ecc4..08ae3a23 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -123,6 +123,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAttestation) { let slot = attestation.data.slot; let validator = attestation.validator_id; + let subnet_id = validator % server.attestation_committee_count; // Encode to SSZ let ssz_bytes = attestation.as_ssz_bytes(); @@ -130,13 +131,25 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte // Compress with raw snappy let compressed = compress_message(&ssz_bytes); + // Look up subscribed topic or construct on-the-fly for gossipsub fanout + let topic = server + .attestation_topics + .get(&subnet_id) + .cloned() + .unwrap_or_else(|| { + let network = "devnet0"; + let topic_kind = format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}"); + libp2p::gossipsub::IdentTopic::new(format!( + "/leanconsensus/{network}/{topic_kind}/ssz_snappy" + )) + }); + // Publish to the attestation subnet topic - server - .swarm_handle - .publish(server.attestation_topic.clone(), compressed); + server.swarm_handle.publish(topic, compressed); info!( %slot, validator, + subnet_id, target_slot = attestation.data.target.slot, target_root = %ShortRoot(&attestation.data.target.root.0), source_slot = attestation.data.source.slot, diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index f482cd33..b0b8031d 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -78,15 +78,17 @@ pub struct SwarmConfig { pub node_key: Vec, pub bootnodes: Vec, pub listening_socket: SocketAddr, - pub validator_id: Option, + pub validator_ids: Vec, pub attestation_committee_count: u64, pub is_aggregator: bool, + pub aggregate_subnet_ids: Option>, } /// Result of building the swarm — contains all pieces needed to start the P2P actor. pub struct BuiltSwarm { pub(crate) swarm: libp2p::Swarm, - pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic, + pub(crate) attestation_topics: HashMap, + pub(crate) attestation_committee_count: u64, pub(crate) block_topic: libp2p::gossipsub::IdentTopic, pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, pub(crate) bootnode_addrs: HashMap, @@ -205,34 +207,48 @@ pub fn build_swarm( .subscribe(&aggregation_topic) .unwrap(); - // Build attestation subnet topic (needed for publishing even without subscribing) - // attestation_committee_count is validated to be >= 1 by clap at CLI parse time. - let subnet_id = config - .validator_id + // Compute the set of subnets to subscribe to. + // Validators subscribe for gossipsub mesh health; aggregators additionally + // subscribe to any explicitly requested subnets. + let validator_subnets: HashSet = config + .validator_ids + .iter() .map(|vid| vid % config.attestation_committee_count) - .unwrap_or(0); - metrics::set_attestation_committee_subnet(subnet_id); + .collect(); - let attestation_topic_kind = format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}"); - let attestation_topic_str = - format!("/leanconsensus/{network}/{attestation_topic_kind}/ssz_snappy"); - let attestation_topic = libp2p::gossipsub::IdentTopic::new(attestation_topic_str); + let mut subscribe_subnets: HashSet = validator_subnets.clone(); - // Only aggregators subscribe to attestation subnets; non-aggregators - // publish via gossipsub's fanout mechanism without subscribing. if config.is_aggregator { - swarm - .behaviour_mut() - .gossipsub - .subscribe(&attestation_topic)?; - info!(%attestation_topic_kind, "Subscribed to attestation subnet"); + if let Some(ref explicit_ids) = config.aggregate_subnet_ids { + subscribe_subnets.extend(explicit_ids); + } + // Aggregator with no validators and no explicit subnets: fallback to subnet 0 + if subscribe_subnets.is_empty() { + subscribe_subnets.insert(0); + } + } + + // Report lowest validator subnet for backward-compatible metric + let metric_subnet = validator_subnets.iter().copied().min().unwrap_or(0); + metrics::set_attestation_committee_subnet(metric_subnet); + + // Build topics and subscribe + let mut attestation_topics: HashMap = HashMap::new(); + for &subnet_id in &subscribe_subnets { + let topic_kind = format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}"); + let topic_str = format!("/leanconsensus/{network}/{topic_kind}/ssz_snappy"); + let topic = libp2p::gossipsub::IdentTopic::new(topic_str); + swarm.behaviour_mut().gossipsub.subscribe(&topic)?; + info!(%topic_kind, "Subscribed to attestation subnet"); + attestation_topics.insert(subnet_id, topic); } info!(socket=%config.listening_socket, "P2P node started"); Ok(BuiltSwarm { swarm, - attestation_topic, + attestation_topics, + attestation_committee_count: config.attestation_committee_count, block_topic, aggregation_topic, bootnode_addrs, @@ -255,7 +271,8 @@ impl P2P { swarm_handle, store, blockchain: None, - attestation_topic: built.attestation_topic, + attestation_topics: built.attestation_topics, + attestation_committee_count: built.attestation_committee_count, block_topic: built.block_topic, aggregation_topic: built.aggregation_topic, connected_peers: HashSet::new(), @@ -288,7 +305,8 @@ pub struct P2PServer { // BlockChain protocol ref (set via InitBlockChain message) pub(crate) blockchain: Option, - pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic, + pub(crate) attestation_topics: HashMap, + pub(crate) attestation_committee_count: u64, pub(crate) block_topic: libp2p::gossipsub::IdentTopic, pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, From 827457a457553d978322a69d5dcf3b0b7da1a549 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 25 Mar 2026 18:35:22 -0300 Subject: [PATCH 2/3] Extract attestation_subnet_topic helper and NETWORK_NAME constant Deduplicates topic string construction between build_swarm and publish_attestation, and consolidates the hardcoded devnet0 network name into a single constant in gossipsub/messages.rs. --- crates/net/p2p/src/gossipsub/handler.rs | 13 +++++-------- crates/net/p2p/src/gossipsub/messages.rs | 11 +++++++++++ crates/net/p2p/src/gossipsub/mod.rs | 4 +++- crates/net/p2p/src/lib.rs | 14 +++++--------- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 08ae3a23..72dcabc7 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -9,7 +9,10 @@ use tracing::{error, info, trace}; use super::{ encoding::{compress_message, decompress_message}, - messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}, + messages::{ + AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND, + attestation_subnet_topic, + }, }; use crate::P2PServer; @@ -136,13 +139,7 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte .attestation_topics .get(&subnet_id) .cloned() - .unwrap_or_else(|| { - let network = "devnet0"; - let topic_kind = format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}"); - libp2p::gossipsub::IdentTopic::new(format!( - "/leanconsensus/{network}/{topic_kind}/ssz_snappy" - )) - }); + .unwrap_or_else(|| attestation_subnet_topic(subnet_id)); // Publish to the attestation subnet topic server.swarm_handle.publish(topic, compressed); diff --git a/crates/net/p2p/src/gossipsub/messages.rs b/crates/net/p2p/src/gossipsub/messages.rs index 2b31eb6a..38f45e94 100644 --- a/crates/net/p2p/src/gossipsub/messages.rs +++ b/crates/net/p2p/src/gossipsub/messages.rs @@ -8,3 +8,14 @@ pub const ATTESTATION_SUBNET_TOPIC_PREFIX: &str = "attestation"; /// /// Full topic format: `/leanconsensus/{network}/aggregation/ssz_snappy` pub const AGGREGATION_TOPIC_KIND: &str = "aggregation"; + +// TODO: make this configurable (e.g., via GenesisConfig or CLI) +pub const NETWORK_NAME: &str = "devnet0"; + +/// Build an attestation subnet topic for the given subnet ID. +pub fn attestation_subnet_topic(subnet_id: u64) -> libp2p::gossipsub::IdentTopic { + let topic_kind = format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}"); + libp2p::gossipsub::IdentTopic::new(format!( + "/leanconsensus/{NETWORK_NAME}/{topic_kind}/ssz_snappy" + )) +} diff --git a/crates/net/p2p/src/gossipsub/mod.rs b/crates/net/p2p/src/gossipsub/mod.rs index befd54c6..83c30558 100644 --- a/crates/net/p2p/src/gossipsub/mod.rs +++ b/crates/net/p2p/src/gossipsub/mod.rs @@ -6,4 +6,6 @@ pub use encoding::decompress_message; pub use handler::{ handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block, }; -pub use messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}; +pub use messages::{ + AGGREGATION_TOPIC_KIND, BLOCK_TOPIC_KIND, NETWORK_NAME, attestation_subnet_topic, +}; diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index b0b8031d..e74216e9 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -36,7 +36,7 @@ use tracing::{info, trace, warn}; use crate::{ gossipsub::{ - AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND, + AGGREGATION_TOPIC_KIND, BLOCK_TOPIC_KIND, NETWORK_NAME, attestation_subnet_topic, publish_aggregated_attestation, publish_attestation, publish_block, }, req_resp::{ @@ -186,10 +186,8 @@ pub fn build_swarm( .listen_on(addr) .expect("failed to bind gossipsub listening address"); - let network = "devnet0"; - // Subscribe to block topic (all nodes) - let block_topic_str = format!("/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy"); + let block_topic_str = format!("/leanconsensus/{NETWORK_NAME}/{BLOCK_TOPIC_KIND}/ssz_snappy"); let block_topic = libp2p::gossipsub::IdentTopic::new(block_topic_str); swarm .behaviour_mut() @@ -199,7 +197,7 @@ pub fn build_swarm( // Subscribe to aggregation topic (all validators) let aggregation_topic_str = - format!("/leanconsensus/{network}/{AGGREGATION_TOPIC_KIND}/ssz_snappy"); + format!("/leanconsensus/{NETWORK_NAME}/{AGGREGATION_TOPIC_KIND}/ssz_snappy"); let aggregation_topic = libp2p::gossipsub::IdentTopic::new(aggregation_topic_str); swarm .behaviour_mut() @@ -235,11 +233,9 @@ pub fn build_swarm( // Build topics and subscribe let mut attestation_topics: HashMap = HashMap::new(); for &subnet_id in &subscribe_subnets { - let topic_kind = format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}"); - let topic_str = format!("/leanconsensus/{network}/{topic_kind}/ssz_snappy"); - let topic = libp2p::gossipsub::IdentTopic::new(topic_str); + let topic = attestation_subnet_topic(subnet_id); swarm.behaviour_mut().gossipsub.subscribe(&topic)?; - info!(%topic_kind, "Subscribed to attestation subnet"); + info!(subnet_id, "Subscribed to attestation subnet"); attestation_topics.insert(subnet_id, topic); } From 259c8c3b4d756dd45943f43d1be05c7f05f6ac05 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 25 Mar 2026 19:07:09 -0300 Subject: [PATCH 3/3] Update crates/net/p2p/src/gossipsub/messages.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Tomás Grüner <47506558+MegaRedHand@users.noreply.github.com> --- crates/net/p2p/src/gossipsub/messages.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/net/p2p/src/gossipsub/messages.rs b/crates/net/p2p/src/gossipsub/messages.rs index 38f45e94..8b10b31a 100644 --- a/crates/net/p2p/src/gossipsub/messages.rs +++ b/crates/net/p2p/src/gossipsub/messages.rs @@ -14,8 +14,7 @@ pub const NETWORK_NAME: &str = "devnet0"; /// Build an attestation subnet topic for the given subnet ID. pub fn attestation_subnet_topic(subnet_id: u64) -> libp2p::gossipsub::IdentTopic { - let topic_kind = format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}"); libp2p::gossipsub::IdentTopic::new(format!( - "/leanconsensus/{NETWORK_NAME}/{topic_kind}/ssz_snappy" + "/leanconsensus/{NETWORK_NAME}/{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}/ssz_snappy" )) }