diff --git a/packages/testing/src/consensus_testing/test_fixtures/api_endpoint.py b/packages/testing/src/consensus_testing/test_fixtures/api_endpoint.py index 669d2daf..31b70bf2 100644 --- a/packages/testing/src/consensus_testing/test_fixtures/api_endpoint.py +++ b/packages/testing/src/consensus_testing/test_fixtures/api_endpoint.py @@ -173,6 +173,11 @@ def _metrics_response(_store: Store, _fixture: "ApiEndpointTest") -> dict[str, A "lean_attestation_aggregate_coverage_validators", "lean_attestation_aggregate_coverage_subnets", "lean_attestation_aggregate_coverage_diff_validators", + "lean_block_proposal_attestation_build_phase_seconds", + "lean_block_proposal_attestation_builds_total", + "lean_block_proposal_child_payloads_consumed_total", + "lean_block_proposal_attestation_data_selected", + "lean_block_proposal_aggregates_selected", "lean_latest_justified_slot", "lean_latest_finalized_slot", "lean_state_transition_time_seconds", diff --git a/packages/testing/src/consensus_testing/test_types/block_spec.py b/packages/testing/src/consensus_testing/test_types/block_spec.py index f95b2f45..4b736fdd 100644 --- a/packages/testing/src/consensus_testing/test_types/block_spec.py +++ b/packages/testing/src/consensus_testing/test_types/block_spec.py @@ -523,7 +523,7 @@ def build_signed_block_with_store( # Aggregation runs on a local clone: gossip pools mutate here, but the # caller's gossip-signature view must not be consumed by this simulated # build. Only the freshly aggregated Type-1 payloads propagate back. - aggregation_store, _ = spec.aggregate(store) + aggregation_store, _ = spec.aggregate(store, skip_trivial_inputs=False) merged_store = spec.accept_new_attestations(aggregation_store) # Build the block through the spec's State.build_block(). diff --git a/src/lean_spec/forks/lstar/spec.py b/src/lean_spec/forks/lstar/spec.py index e2e1b992..73288eb4 100644 --- a/src/lean_spec/forks/lstar/spec.py +++ b/src/lean_spec/forks/lstar/spec.py @@ -1,6 +1,7 @@ """Lstar fork — identity and construction facade.""" import math +import time from collections import defaultdict from collections.abc import Iterable, Sequence, Set as AbstractSet from typing import Any, ClassVar @@ -32,6 +33,7 @@ JUSTIFICATION_LOOKBACK_SLOTS, MAX_ATTESTATIONS_DATA, ) +from lean_spec.subspecs.metrics.registry import registry as metrics from lean_spec.subspecs.observability import ( observe_on_attestation, observe_on_block, @@ -702,8 +704,10 @@ def build_block( ) processed_att_data: set[AttestationData] = set() + child_payloads_consumed = 0 while True: + select_start = time.perf_counter() found_entries = False for att_data, proofs in sorted( @@ -759,6 +763,7 @@ def build_block( found_entries = True selected, _ = TypeOneMultiSignature.select_greedily(proofs) + child_payloads_consumed += len(selected) aggregated_signatures.extend(selected) for proof in selected: aggregated_attestations.append( @@ -768,9 +773,15 @@ def build_block( ) ) + metrics.lean_block_proposal_attestation_build_phase_seconds.labels( + phase="select_payloads", + ).observe(time.perf_counter() - select_start) + if not found_entries: break + stf_start = time.perf_counter() + # Build candidate block and check if justification changed. candidate_block = self.block_class( slot=slot, @@ -794,11 +805,17 @@ def build_block( post_state.latest_justified != current_justified or post_state.latest_finalized.slot != current_finalized_slot ): + metrics.lean_block_proposal_attestation_build_phase_seconds.labels( + phase="stf_simulate", + ).observe(time.perf_counter() - stf_start) current_justified = post_state.latest_justified current_justified_slots = post_state.justified_slots current_finalized_slot = post_state.latest_finalized.slot continue + metrics.lean_block_proposal_attestation_build_phase_seconds.labels( + phase="stf_simulate", + ).observe(time.perf_counter() - stf_start) break # Compact: merge all proofs sharing the same AttestationData into one @@ -807,6 +824,7 @@ def build_block( # During the fixed-point loop above, multiple proofs may have been # selected for the same AttestationData across iterations. Group them # and merge each group into a single recursive proof. + compact_start = time.perf_counter() proof_groups: dict[AttestationData, list[TypeOneMultiSignature]] = {} for att, sig in zip(aggregated_attestations, aggregated_signatures, strict=True): proof_groups.setdefault(att.data, []).append(sig) @@ -844,6 +862,14 @@ def build_block( ) ) + metrics.lean_block_proposal_attestation_build_phase_seconds.labels( + phase="compact", + ).observe(time.perf_counter() - compact_start) + metrics.lean_block_proposal_attestation_builds_total.inc() + metrics.lean_block_proposal_child_payloads_consumed_total.inc(child_payloads_consumed) + metrics.lean_block_proposal_attestation_data_selected.observe(len(processed_att_data)) + metrics.lean_block_proposal_aggregates_selected.observe(len(aggregated_signatures)) + # Create the final block with selected attestations. final_block = self.block_class( slot=slot, @@ -1603,13 +1629,27 @@ def update_safe_target(self, store: LstarStore) -> LstarStore: # The head and attestation pools remain unchanged. return store.model_copy(update={"safe_target": safe_target}) - def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregatedAttestation]]: + def aggregate( + self, + store: LstarStore, + *, + skip_trivial_inputs: bool = True, + ) -> tuple[LstarStore, list[SignedAggregatedAttestation]]: """Turn raw validator votes into compact aggregated attestations. Validators cast individual signatures over gossip. Before those votes can influence fork choice or be included in a block, they must be combined into compact cryptographic proofs. + ``skip_trivial_inputs`` (default ``True``) is an **aggregator-role** + policy: when set, the ``1 raw + 0 children`` shape is skipped because + a single-validator "aggregate" carries no consensus signal beyond the + raw gossip sig already on the network (see issue #747). Interval-2 + aggregator ticks use the default. Block-building callers that must + fold every chosen ``att_data`` into ``latest_known_aggregated_payloads`` + (including lone gossip sigs) pass ``skip_trivial_inputs=False`` — + see ``BlockSpec`` in the consensus-testing filler. + The store holds three pools of attestation evidence: - **Gossip signatures**: individual validator votes arriving in real-time. @@ -1676,13 +1716,19 @@ def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregate if e.validator_id not in covered ] - # The aggregation layer enforces a minimum: either at least one - # raw signature, or at least two child proofs to merge. - # - # A lone child proof is already a valid proof — nothing to do. + # Always skip cases where there is nothing to aggregate: + # - 0 raw + 0 children: nothing to aggregate. + # - 0 raw + 1 child: a lone child proof is already valid. if not raw_entries and len(child_proofs) < 2: continue + # Aggregator-role optimization (``skip_trivial_inputs=True``, the + # default): skip ``1 raw + 0 children``. Block-building callers + # pass ``skip_trivial_inputs=False`` so every gossip sig they + # seeded is folded into ``latest_known_aggregated_payloads``. + if skip_trivial_inputs and not child_proofs and len(raw_entries) <= 1: + continue + # Encode raw signers as a compact bitfield when present. # Child-only aggregation (no raw signatures) must pass None. if raw_entries: diff --git a/src/lean_spec/subspecs/metrics/registry.py b/src/lean_spec/subspecs/metrics/registry.py index 29bb5ba0..44aaee01 100644 --- a/src/lean_spec/subspecs/metrics/registry.py +++ b/src/lean_spec/subspecs/metrics/registry.py @@ -45,6 +45,35 @@ REORG_DEPTH_BUCKETS = (1, 2, 3, 5, 7, 10, 20, 30, 50, 100) """Block count. Reorg depths above 10 are rare and signal network issues.""" +BLOCK_PROPOSAL_ATTESTATION_PHASE_BUCKETS = ( + 0.001, + 0.005, + 0.01, + 0.025, + 0.05, + 0.1, + 0.25, + 0.5, + 1, + 2, + 4, + 8, +) +"""Seconds. Phase-level time inside block-proposal attestation selection.""" + +BLOCK_PROPOSAL_ATTESTATION_DATA_BUCKETS = (0, 1, 2, 4, 8, 16, 32) +"""Distinct AttestationData entries selected per proposal build.""" + +BLOCK_PROPOSAL_AGGREGATES_BUCKETS = (0, 1, 2, 4, 8, 16, 32, 64, 128) +"""Aggregated signature proofs in the proposal result after compaction.""" + +BLOCK_PROPOSAL_ATTESTATION_BUILD_PHASES = ( + "select_payloads", + "compact", + "stf_simulate", +) +"""Phase labels for lean_block_proposal_attestation_build_phase_seconds.""" + # Section labels for attestation aggregate coverage gauges. These match the # names printed in slot/report logs: timely, late, block, combined, # agg_start_new, proposal_payloads, proposal_gossip, and proposal_combined. @@ -154,6 +183,18 @@ class MetricsRegistry: lean_attestation_aggregate_coverage_diff_validators: Gauge | _NoOpMetric = _NOOP """Validator coverage delta between block payloads and timely pre-merge payloads.""" + # Block proposal attestation selection (build_block fixed-point loop) + lean_block_proposal_attestation_build_phase_seconds: Histogram | _NoOpMetric = _NOOP + """Phase-level time in block-proposal attestation selection.""" + lean_block_proposal_attestation_builds_total: Counter | _NoOpMetric = _NOOP + """Completed block-proposal attestation selection runs.""" + lean_block_proposal_child_payloads_consumed_total: Counter | _NoOpMetric = _NOOP + """Child aggregated payloads selected during greedy proof picking.""" + lean_block_proposal_attestation_data_selected: Histogram | _NoOpMetric = _NOOP + """Distinct AttestationData entries in the proposal block body.""" + lean_block_proposal_aggregates_selected: Histogram | _NoOpMetric = _NOOP + """Aggregated signature proofs in the proposal result after compaction.""" + # State transition lean_latest_justified_slot: Gauge | _NoOpMetric = _NOOP """Slot of the most recently justified checkpoint.""" @@ -316,6 +357,45 @@ def init( direction=direction, ).set(0) + # Block proposal attestation selection (build_block / getProposalAttestations) + self.lean_block_proposal_attestation_build_phase_seconds = Histogram( + "lean_block_proposal_attestation_build_phase_seconds", + ( + "Phase-level time in block-proposal attestation selection: " + "select_payloads (greedy child-payload pick), compact " + "(recursive merge of proofs per AttestationData), " + "stf_simulate (candidate block STF)." + ), + ["phase"], + buckets=BLOCK_PROPOSAL_ATTESTATION_PHASE_BUCKETS, + registry=reg, + ) + self.lean_block_proposal_attestation_builds_total = Counter( + "lean_block_proposal_attestation_builds_total", + "Completed block-proposal attestation selection runs (one per proposal attempt).", + registry=reg, + ) + self.lean_block_proposal_child_payloads_consumed_total = Counter( + "lean_block_proposal_child_payloads_consumed_total", + ( + "Child aggregated payloads selected during greedy proof picking " + "(before recursive compaction)." + ), + registry=reg, + ) + self.lean_block_proposal_attestation_data_selected = Histogram( + "lean_block_proposal_attestation_data_selected", + "Distinct AttestationData entries in the proposal block body.", + buckets=BLOCK_PROPOSAL_ATTESTATION_DATA_BUCKETS, + registry=reg, + ) + self.lean_block_proposal_aggregates_selected = Histogram( + "lean_block_proposal_aggregates_selected", + "Aggregated signature proofs in the proposal result after compaction.", + buckets=BLOCK_PROPOSAL_AGGREGATES_BUCKETS, + registry=reg, + ) + # State transition (leanMetrics: State Transition Metrics) self.lean_latest_justified_slot = Gauge( "lean_latest_justified_slot", diff --git a/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py b/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py index 496895b3..c42d17fa 100644 --- a/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py @@ -593,12 +593,33 @@ def test_multiple_attestation_data_grouped_separately( source=att_data_1.source, ) - # Validators 1 attests to data_1, validator 2 attests to data_2 - sig_1 = key_manager.sign_attestation_data(ValidatorIndex(1), att_data_1) - sig_2 = key_manager.sign_attestation_data(ValidatorIndex(2), att_data_2) + # Validators 1, 3 attest to data_1; validators 2, 0 attest to data_2. + # Two distinct sigs per att_data is the minimum non-trivial shape: + # `aggregate()` skips the `1 raw + 0 children` case (a single-validator + # "aggregate" carries no information the raw gossip sig doesn't + # already carry), so this test must seed at least two raw sigs per + # `att_data` for the per-data grouping it is asserting. attestation_signatures = { - att_data_1: {AttestationSignatureEntry(ValidatorIndex(1), sig_1)}, - att_data_2: {AttestationSignatureEntry(ValidatorIndex(2), sig_2)}, + att_data_1: { + AttestationSignatureEntry( + ValidatorIndex(1), + key_manager.sign_attestation_data(ValidatorIndex(1), att_data_1), + ), + AttestationSignatureEntry( + ValidatorIndex(3), + key_manager.sign_attestation_data(ValidatorIndex(3), att_data_1), + ), + }, + att_data_2: { + AttestationSignatureEntry( + ValidatorIndex(2), + key_manager.sign_attestation_data(ValidatorIndex(2), att_data_2), + ), + AttestationSignatureEntry( + ValidatorIndex(0), + key_manager.sign_attestation_data(ValidatorIndex(0), att_data_2), + ), + }, } store = base_store.model_copy( diff --git a/tests/lean_spec/forks/lstar/state/test_state_aggregation.py b/tests/lean_spec/forks/lstar/state/test_state_aggregation.py index cf3599c0..6579a0b3 100644 --- a/tests/lean_spec/forks/lstar/state/test_state_aggregation.py +++ b/tests/lean_spec/forks/lstar/state/test_state_aggregation.py @@ -141,6 +141,41 @@ def test_aggregate_with_empty_attestation_signatures( assert results == [] +def test_aggregate_skips_single_gossip_sig_with_no_children( + container_key_manager: XmssKeyManager, + spec: LstarSpec, +) -> None: + """Trivial 1 raw sig + 0 children case: aggregate returns nothing. + + A single-validator "aggregate" carries no information the raw gossip + sig doesn't already carry — the sig is on the per-subnet + `attestation_signatures` gossip topic at sign time, so any peer + aggregator can fold it in as a raw entry next round. The recursive + STARK prover is constant-cost in input size, so building a + 1-validator proof spends the full prover budget for zero consensus + signal. The unconsumed gossip sig must remain in + `store.attestation_signatures` so it is folded in by a future round + once another sig or a child shows up. + """ + store = make_store(num_validators=2, key_manager=container_key_manager) + source = Checkpoint(root=make_bytes32(1), slot=Slot(0)) + att_data = make_attestation_data_simple( + Slot(2), make_bytes32(3), make_bytes32(4), source=source + ) + sig_entry = AttestationSignatureEntry( + ValidatorIndex(0), + container_key_manager.sign_attestation_data(ValidatorIndex(0), att_data), + ) + attestation_signatures = {att_data: {sig_entry}} + + store = store.model_copy(update={"attestation_signatures": attestation_signatures}) + updated_store, results = spec.aggregate(store) + + assert results == [] + # The lone sig must survive untouched for a later, non-trivial pass. + assert updated_store.attestation_signatures == attestation_signatures + + def test_aggregated_signatures_with_multiple_data_groups( container_key_manager: XmssKeyManager, spec: LstarSpec, diff --git a/tests/lean_spec/subspecs/metrics/test_registry.py b/tests/lean_spec/subspecs/metrics/test_registry.py index 0ebe28f9..50448215 100644 --- a/tests/lean_spec/subspecs/metrics/test_registry.py +++ b/tests/lean_spec/subspecs/metrics/test_registry.py @@ -10,6 +10,7 @@ from lean_spec.subspecs.metrics.registry import ( ATTESTATION_AGGREGATE_COVERAGE_DIFF_DIRECTIONS, ATTESTATION_AGGREGATE_COVERAGE_SECTIONS, + BLOCK_PROPOSAL_ATTESTATION_BUILD_PHASES, registry, ) @@ -53,3 +54,21 @@ def test_attestation_aggregate_coverage_metrics_registered() -> None: ) == 0.0 ) + + +def test_block_proposal_attestation_build_metrics_registered() -> None: + """Block-proposal attestation selection metrics are registered on init.""" + test_reg = CollectorRegistry() + registry.init(registry=test_reg) + + for phase in BLOCK_PROPOSAL_ATTESTATION_BUILD_PHASES: + assert ( + test_reg.get_sample_value( + "lean_block_proposal_attestation_build_phase_seconds_count", + {"phase": phase}, + ) + is None + ) + + assert test_reg.get_sample_value("lean_block_proposal_attestation_builds_total") == 0.0 + assert test_reg.get_sample_value("lean_block_proposal_child_payloads_consumed_total") == 0.0