diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 094600a..6b787c1 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -203,9 +203,11 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec known promotion. + store.insert_known_aggregated_payloads_batch(payload_entries); + metrics::update_latest_known_aggregated_payloads(store.known_aggregated_payloads_count()); // Delete aggregated entries from gossip_signatures store.delete_gossip_signatures(&keys_to_delete); @@ -805,35 +807,26 @@ pub fn produce_block_with_signatures( }); } - // Single pass over known aggregated payloads: extract both attestation data and proofs - let known_payloads: Vec<_> = store.iter_known_aggregated_payloads().collect(); - - let known_attestations = - store.extract_latest_attestations(known_payloads.iter().map(|(key, _)| *key)); - let available_attestations: Vec = known_attestations - .into_iter() - .map(|(validator_id, data)| Attestation { validator_id, data }) - .collect(); - - // Get known block roots for attestation validation let known_block_roots = store.get_block_roots(); - // Collect existing proofs for block building from the already-fetched payloads - let aggregated_payloads: HashMap> = known_payloads - .into_iter() - .map(|(key, stored_payloads)| { - let proofs = stored_payloads.into_iter().map(|sp| sp.proof).collect(); - (key, proofs) - }) - .collect(); + // Group payloads by data_root with deduplicated proofs + let proofs_by_data_root = store.known_payloads_by_data_root(); + + // Resolve AttestationData for each data_root + let aggregated_payloads: HashMap)> = + proofs_by_data_root + .into_iter() + .filter_map(|(data_root, proofs)| { + let data = store.get_attestation_data_by_root(&data_root)?; + Some((data_root, (data, proofs))) + }) + .collect(); - // Build the block using fixed-point attestation collection - let (block, _post_state, signatures) = build_block( + let (block, signatures) = build_block( &head_state, slot, validator_index, head_root, - &available_attestations, &known_block_roots, &aggregated_payloads, )?; @@ -962,48 +955,64 @@ fn aggregation_bits_from_validator_indices(bits: &[u64]) -> AggregationBits { aggregation_bits } -/// Group individual attestations by their data and create aggregated attestations. +/// Greedily select proofs maximizing new validator coverage. /// -/// Attestations with identical `AttestationData` are combined into a single -/// `AggregatedAttestation` with a bitfield indicating participating validators. -fn aggregate_attestations_by_data(attestations: &[Attestation]) -> Vec { - // Group attestations by their data root - let mut groups: HashMap)> = HashMap::new(); - - for attestation in attestations { - let data_root = attestation.data.tree_hash_root(); - groups - .entry(data_root) - .or_insert_with(|| (attestation.data.clone(), Vec::new())) - .1 - .push(attestation.validator_id); +/// For a single attestation data entry, picks proofs that cover the most +/// uncovered validators. Each selected proof produces one AggregatedAttestation. +fn extend_proofs_greedily( + proofs: &[AggregatedSignatureProof], + selected_proofs: &mut Vec, + attestations: &mut Vec, + att_data: &AttestationData, +) { + if proofs.is_empty() { + return; } - // Convert groups into aggregated attestations - groups - .into_values() - .map(|(data, validator_ids)| { - // Find max validator id to determine bitlist capacity - let max_id = validator_ids.iter().copied().max().unwrap_or(0) as usize; - let mut bits = - AggregationBits::with_capacity(max_id + 1).expect("validator count exceeds limit"); - - for vid in validator_ids { - bits.set(vid as usize, true) - .expect("validator id exceeds capacity"); - } + let mut covered: HashSet = HashSet::new(); + let mut remaining_indices: HashSet = (0..proofs.len()).collect(); - AggregatedAttestation { - aggregation_bits: bits, - data, - } - }) - .collect() + while !remaining_indices.is_empty() { + // Pick proof covering the most uncovered validators + let best = remaining_indices + .iter() + .map(|&idx| { + let new_coverage: Vec = proofs[idx] + .participant_indices() + .filter(|vid| !covered.contains(vid)) + .collect(); + (idx, new_coverage) + }) + .max_by_key(|(_, cov)| cov.len()); + + let Some((best_idx, new_covered)) = best else { + break; + }; + if new_covered.is_empty() { + break; + } + + let proof = &proofs[best_idx]; + attestations.push(AggregatedAttestation { + aggregation_bits: proof.participants.clone(), + data: att_data.clone(), + }); + selected_proofs.push(proof.clone()); + + metrics::inc_pq_sig_aggregated_signatures(); + metrics::inc_pq_sig_attestations_in_aggregated_signatures(new_covered.len() as u64); + + covered.extend(new_covered); + remaining_indices.remove(&best_idx); + } } /// Build a valid block on top of this state. /// -/// Returns the block, post-state, and a list of attestation signature proofs +/// Works directly with aggregated payloads keyed by data_root, filtering +/// and selecting proofs without reconstructing individual attestations. +/// +/// Returns the block and a list of attestation signature proofs /// (one per attestation in block.body.attestations). The proposer signature /// proof is NOT included; it is appended by the caller. fn build_block( @@ -1011,90 +1020,89 @@ fn build_block( slot: u64, proposer_index: u64, parent_root: H256, - available_attestations: &[Attestation], known_block_roots: &HashSet, - aggregated_payloads: &HashMap>, -) -> Result<(Block, State, Vec), StoreError> { - // Start with empty attestation set - let mut included_attestations: Vec = Vec::new(); - - // Track which attestations we've already considered (by validator_id, data_root) - let mut included_keys: HashSet = HashSet::new(); - - // Fixed-point loop: collect attestations until no new ones can be added - loop { - // Aggregate attestations by data for the candidate block - let aggregated = aggregate_attestations_by_data(&included_attestations); - let attestations: AggregatedAttestations = aggregated - .clone() - .try_into() - .expect("attestation count exceeds limit"); - - // Create candidate block with current attestations (state_root is placeholder) - let candidate_block = Block { - slot, - proposer_index, - parent_root, - state_root: H256::ZERO, - body: BlockBody { attestations }, + aggregated_payloads: &HashMap)>, +) -> Result<(Block, Vec), StoreError> { + let mut aggregated_attestations: Vec = Vec::new(); + let mut aggregated_signatures: Vec = Vec::new(); + + if !aggregated_payloads.is_empty() { + // Genesis edge case: when building on genesis (slot 0), + // process_block_header will set latest_justified.root = parent_root. + // Derive this upfront so attestation filtering matches. + let mut current_justified = if head_state.latest_block_header.slot == 0 { + Checkpoint { + root: parent_root, + slot: head_state.latest_justified.slot, + } + } else { + head_state.latest_justified }; - // Apply state transition: process_slots + process_block - let mut post_state = head_state.clone(); - process_slots(&mut post_state, slot)?; - process_block(&mut post_state, &candidate_block)?; + let mut processed_data_roots: HashSet = HashSet::new(); - // No attestation source provided: done after computing post_state - if available_attestations.is_empty() || known_block_roots.is_empty() { - break; - } + // Sort by target.slot for deterministic processing order + let mut sorted_entries: Vec<_> = aggregated_payloads.iter().collect(); + sorted_entries.sort_by_key(|(_, (data, _))| data.target.slot); - // Find new valid attestations matching post-state requirements - let mut new_attestations: Vec = Vec::new(); + loop { + let mut found_new = false; - for attestation in available_attestations { - let data_root = attestation.data.tree_hash_root(); - let sig_key: SignatureKey = (attestation.validator_id, data_root); + for &(data_root, (att_data, proofs)) in &sorted_entries { + if processed_data_roots.contains(data_root) { + continue; + } + if !known_block_roots.contains(&att_data.head.root) { + continue; + } + if att_data.source != current_justified { + continue; + } - // Skip if target block is unknown - if !known_block_roots.contains(&attestation.data.head.root) { - continue; - } + processed_data_roots.insert(*data_root); + found_new = true; - // Skip if attestation source does not match post-state's latest justified - if attestation.data.source != post_state.latest_justified { - continue; + extend_proofs_greedily( + proofs, + &mut aggregated_signatures, + &mut aggregated_attestations, + att_data, + ); } - // Avoid adding duplicates of attestations already in the candidate set - if included_keys.contains(&sig_key) { - continue; + if !found_new { + break; } - // Only include if we have a proof for this attestation - if aggregated_payloads.contains_key(&sig_key) { - new_attestations.push(attestation.clone()); - included_keys.insert(sig_key); + // Check if justification advanced + let attestations: AggregatedAttestations = aggregated_attestations + .clone() + .try_into() + .expect("attestation count exceeds limit"); + let candidate = Block { + slot, + proposer_index, + parent_root, + state_root: H256::ZERO, + body: BlockBody { attestations }, + }; + let mut post_state = head_state.clone(); + process_slots(&mut post_state, slot)?; + process_block(&mut post_state, &candidate)?; + + if post_state.latest_justified != current_justified { + current_justified = post_state.latest_justified; + // Continue: new checkpoint may unlock more attestation data + } else { + break; } } - - // Fixed point reached: no new attestations found - if new_attestations.is_empty() { - break; - } - - // Add new attestations and continue iteration - included_attestations.extend(new_attestations); } - // Select existing proofs for the attestations to include in the block. - let (aggregated_attestations, aggregated_signatures) = - select_aggregated_proofs(&included_attestations, aggregated_payloads)?; - + // Build final block let attestations: AggregatedAttestations = aggregated_attestations .try_into() .expect("attestation count exceeds limit"); - let mut final_block = Block { slot, proposer_index, @@ -1102,81 +1110,12 @@ fn build_block( state_root: H256::ZERO, body: BlockBody { attestations }, }; - - // Recompute post-state with final block to get correct state root let mut post_state = head_state.clone(); process_slots(&mut post_state, slot)?; process_block(&mut post_state, &final_block)?; - final_block.state_root = post_state.tree_hash_root(); - Ok((final_block, post_state, aggregated_signatures)) -} - -/// Select existing aggregated proofs for attestations to include in a block. -/// -/// Fresh gossip aggregation happens at interval 2 (`aggregate_committee_signatures`). -/// This function only selects from existing proofs in the known aggregated payloads buffer -/// (proofs from previously received blocks and promoted gossip aggregations). -/// -/// Returns a list of (attestation, proof) pairs ready for block inclusion. -fn select_aggregated_proofs( - attestations: &[Attestation], - aggregated_payloads: &HashMap>, -) -> Result<(Vec, Vec), StoreError> { - let mut results = vec![]; - - for aggregated in aggregate_attestations_by_data(attestations) { - let data = &aggregated.data; - let message = data.tree_hash_root(); - - let mut remaining: HashSet = validator_indices(&aggregated.aggregation_bits).collect(); - - // Select existing proofs that cover the most remaining validators - while !remaining.is_empty() { - let Some(&target_id) = remaining.iter().next() else { - break; - }; - - let Some(candidates) = aggregated_payloads - .get(&(target_id, message)) - .filter(|v| !v.is_empty()) - else { - break; - }; - - let (proof, covered) = candidates - .iter() - .map(|p| { - let covered: Vec<_> = validator_indices(&p.participants) - .filter(|vid| remaining.contains(vid)) - .collect(); - (p, covered) - }) - .max_by_key(|(_, covered)| covered.len()) - .expect("candidates is not empty"); - - // No proof covers any remaining validator - if covered.is_empty() { - break; - } - - let aggregate = AggregatedAttestation { - aggregation_bits: proof.participants.clone(), - data: data.clone(), - }; - results.push((aggregate, proof.clone())); - - metrics::inc_pq_sig_aggregated_signatures(); - metrics::inc_pq_sig_attestations_in_aggregated_signatures(covered.len() as u64); - - for vid in covered { - remaining.remove(&vid); - } - } - } - - Ok(results.into_iter().unzip()) + Ok((final_block, aggregated_signatures)) } /// Verify all signatures in a signed block. diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index f2c380f..e7bed2a 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -14,8 +14,8 @@ use crate::types::{StoredAggregatedPayload, StoredSignature}; use ethlambda_types::{ attestation::AttestationData, block::{ - Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, BlockWithAttestation, - SignedBlockWithAttestation, + AggregatedSignatureProof, Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, + BlockWithAttestation, SignedBlockWithAttestation, }, checkpoint::Checkpoint, primitives::{ @@ -148,6 +148,21 @@ impl PayloadBuffer { map } + /// Group entries by data_root, deduplicating proofs by participant bitfield. + fn grouped_by_data_root(&self) -> HashMap> { + let mut map: HashMap> = HashMap::new(); + let mut seen: HashMap>> = HashMap::new(); + for ((_vid, data_root), payload) in &self.entries { + let key_bytes = payload.proof.participants.as_ssz_bytes(); + if seen.entry(*data_root).or_default().insert(key_bytes) { + map.entry(*data_root) + .or_default() + .push(payload.proof.clone()); + } + } + map + } + /// Return deduplicated keys. fn unique_keys(&self) -> HashSet { self.entries.iter().map(|(key, _)| *key).collect() @@ -889,6 +904,11 @@ impl Store { // "Known" aggregated payloads are active in fork choice weight calculations. // Promoted from "new" payloads at specific intervals (0 with proposal, 4). + /// Group known payloads by data_root with deduplicated proofs. + pub fn known_payloads_by_data_root(&self) -> HashMap> { + self.known_payloads.lock().unwrap().grouped_by_data_root() + } + /// Iterates over all known aggregated payloads, grouped by key. pub fn iter_known_aggregated_payloads( &self,