Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ def _metrics_response(_store: Store, _fixture: "ApiEndpointTest") -> dict[str, A
"lean_attestation_aggregate_coverage_validators",
"lean_attestation_aggregate_coverage_subnets",
"lean_attestation_aggregate_coverage_diff_validators",
"lean_block_proposal_attestation_build_phase_seconds",
"lean_block_proposal_attestation_builds_total",
"lean_block_proposal_child_payloads_consumed_total",
"lean_block_proposal_attestation_data_selected",
"lean_block_proposal_aggregates_selected",
"lean_latest_justified_slot",
"lean_latest_finalized_slot",
"lean_state_transition_time_seconds",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ def build_signed_block_with_store(
# Aggregation runs on a local clone: gossip pools mutate here, but the
# caller's gossip-signature view must not be consumed by this simulated
# build. Only the freshly aggregated Type-1 payloads propagate back.
aggregation_store, _ = spec.aggregate(store)
aggregation_store, _ = spec.aggregate(store, skip_trivial_inputs=False)
merged_store = spec.accept_new_attestations(aggregation_store)

# Build the block through the spec's State.build_block().
Expand Down
56 changes: 51 additions & 5 deletions src/lean_spec/forks/lstar/spec.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Lstar fork — identity and construction facade."""

import math
import time
from collections import defaultdict
from collections.abc import Iterable, Sequence, Set as AbstractSet
from typing import Any, ClassVar
Expand Down Expand Up @@ -32,6 +33,7 @@
JUSTIFICATION_LOOKBACK_SLOTS,
MAX_ATTESTATIONS_DATA,
)
from lean_spec.subspecs.metrics.registry import registry as metrics
from lean_spec.subspecs.observability import (
observe_on_attestation,
observe_on_block,
Expand Down Expand Up @@ -702,8 +704,10 @@ def build_block(
)

processed_att_data: set[AttestationData] = set()
child_payloads_consumed = 0

while True:
select_start = time.perf_counter()
found_entries = False

for att_data, proofs in sorted(
Expand Down Expand Up @@ -759,6 +763,7 @@ def build_block(
found_entries = True

selected, _ = TypeOneMultiSignature.select_greedily(proofs)
child_payloads_consumed += len(selected)
aggregated_signatures.extend(selected)
for proof in selected:
aggregated_attestations.append(
Expand All @@ -768,9 +773,15 @@ def build_block(
)
)

metrics.lean_block_proposal_attestation_build_phase_seconds.labels(
phase="select_payloads",
).observe(time.perf_counter() - select_start)

if not found_entries:
break

stf_start = time.perf_counter()

# Build candidate block and check if justification changed.
candidate_block = self.block_class(
slot=slot,
Expand All @@ -794,11 +805,17 @@ def build_block(
post_state.latest_justified != current_justified
or post_state.latest_finalized.slot != current_finalized_slot
):
metrics.lean_block_proposal_attestation_build_phase_seconds.labels(
phase="stf_simulate",
).observe(time.perf_counter() - stf_start)
current_justified = post_state.latest_justified
current_justified_slots = post_state.justified_slots
current_finalized_slot = post_state.latest_finalized.slot
continue

metrics.lean_block_proposal_attestation_build_phase_seconds.labels(
phase="stf_simulate",
).observe(time.perf_counter() - stf_start)
break

# Compact: merge all proofs sharing the same AttestationData into one
Expand All @@ -807,6 +824,7 @@ def build_block(
# During the fixed-point loop above, multiple proofs may have been
# selected for the same AttestationData across iterations. Group them
# and merge each group into a single recursive proof.
compact_start = time.perf_counter()
proof_groups: dict[AttestationData, list[TypeOneMultiSignature]] = {}
for att, sig in zip(aggregated_attestations, aggregated_signatures, strict=True):
proof_groups.setdefault(att.data, []).append(sig)
Expand Down Expand Up @@ -844,6 +862,14 @@ def build_block(
)
)

metrics.lean_block_proposal_attestation_build_phase_seconds.labels(
phase="compact",
).observe(time.perf_counter() - compact_start)
metrics.lean_block_proposal_attestation_builds_total.inc()
metrics.lean_block_proposal_child_payloads_consumed_total.inc(child_payloads_consumed)
metrics.lean_block_proposal_attestation_data_selected.observe(len(processed_att_data))
metrics.lean_block_proposal_aggregates_selected.observe(len(aggregated_signatures))

# Create the final block with selected attestations.
final_block = self.block_class(
slot=slot,
Expand Down Expand Up @@ -1603,13 +1629,27 @@ def update_safe_target(self, store: LstarStore) -> LstarStore:
# The head and attestation pools remain unchanged.
return store.model_copy(update={"safe_target": safe_target})

def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregatedAttestation]]:
def aggregate(
self,
store: LstarStore,
*,
skip_trivial_inputs: bool = True,
) -> tuple[LstarStore, list[SignedAggregatedAttestation]]:
"""Turn raw validator votes into compact aggregated attestations.

Validators cast individual signatures over gossip. Before those
votes can influence fork choice or be included in a block, they
must be combined into compact cryptographic proofs.

``skip_trivial_inputs`` (default ``True``) is an **aggregator-role**
policy: when set, the ``1 raw + 0 children`` shape is skipped because
a single-validator "aggregate" carries no consensus signal beyond the
raw gossip sig already on the network (see issue #747). Interval-2
aggregator ticks use the default. Block-building callers that must
fold every chosen ``att_data`` into ``latest_known_aggregated_payloads``
(including lone gossip sigs) pass ``skip_trivial_inputs=False`` —
see ``BlockSpec`` in the consensus-testing filler.

The store holds three pools of attestation evidence:

- **Gossip signatures**: individual validator votes arriving in real-time.
Expand Down Expand Up @@ -1676,13 +1716,19 @@ def aggregate(self, store: LstarStore) -> tuple[LstarStore, list[SignedAggregate
if e.validator_id not in covered
]

# The aggregation layer enforces a minimum: either at least one
# raw signature, or at least two child proofs to merge.
#
# A lone child proof is already a valid proof — nothing to do.
# Always skip cases where there is nothing to aggregate:
# - 0 raw + 0 children: nothing to aggregate.
# - 0 raw + 1 child: a lone child proof is already valid.
if not raw_entries and len(child_proofs) < 2:
continue

# Aggregator-role optimization (``skip_trivial_inputs=True``, the
# default): skip ``1 raw + 0 children``. Block-building callers
# pass ``skip_trivial_inputs=False`` so every gossip sig they
# seeded is folded into ``latest_known_aggregated_payloads``.
if skip_trivial_inputs and not child_proofs and len(raw_entries) <= 1:
continue

# Encode raw signers as a compact bitfield when present.
# Child-only aggregation (no raw signatures) must pass None.
if raw_entries:
Expand Down
80 changes: 80 additions & 0 deletions src/lean_spec/subspecs/metrics/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,35 @@
REORG_DEPTH_BUCKETS = (1, 2, 3, 5, 7, 10, 20, 30, 50, 100)
"""Block count. Reorg depths above 10 are rare and signal network issues."""

BLOCK_PROPOSAL_ATTESTATION_PHASE_BUCKETS = (
0.001,
0.005,
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1,
2,
4,
8,
)
"""Seconds. Phase-level time inside block-proposal attestation selection."""

BLOCK_PROPOSAL_ATTESTATION_DATA_BUCKETS = (0, 1, 2, 4, 8, 16, 32)
"""Distinct AttestationData entries selected per proposal build."""

BLOCK_PROPOSAL_AGGREGATES_BUCKETS = (0, 1, 2, 4, 8, 16, 32, 64, 128)
"""Aggregated signature proofs in the proposal result after compaction."""

BLOCK_PROPOSAL_ATTESTATION_BUILD_PHASES = (
"select_payloads",
"compact",
"stf_simulate",
)
"""Phase labels for lean_block_proposal_attestation_build_phase_seconds."""

# Section labels for attestation aggregate coverage gauges. These match the
# names printed in slot/report logs: timely, late, block, combined,
# agg_start_new, proposal_payloads, proposal_gossip, and proposal_combined.
Expand Down Expand Up @@ -154,6 +183,18 @@ class MetricsRegistry:
lean_attestation_aggregate_coverage_diff_validators: Gauge | _NoOpMetric = _NOOP
"""Validator coverage delta between block payloads and timely pre-merge payloads."""

# Block proposal attestation selection (build_block fixed-point loop)
lean_block_proposal_attestation_build_phase_seconds: Histogram | _NoOpMetric = _NOOP
"""Phase-level time in block-proposal attestation selection."""
lean_block_proposal_attestation_builds_total: Counter | _NoOpMetric = _NOOP
"""Completed block-proposal attestation selection runs."""
lean_block_proposal_child_payloads_consumed_total: Counter | _NoOpMetric = _NOOP
"""Child aggregated payloads selected during greedy proof picking."""
lean_block_proposal_attestation_data_selected: Histogram | _NoOpMetric = _NOOP
"""Distinct AttestationData entries in the proposal block body."""
lean_block_proposal_aggregates_selected: Histogram | _NoOpMetric = _NOOP
"""Aggregated signature proofs in the proposal result after compaction."""

# State transition
lean_latest_justified_slot: Gauge | _NoOpMetric = _NOOP
"""Slot of the most recently justified checkpoint."""
Expand Down Expand Up @@ -316,6 +357,45 @@ def init(
direction=direction,
).set(0)

# Block proposal attestation selection (build_block / getProposalAttestations)
self.lean_block_proposal_attestation_build_phase_seconds = Histogram(
"lean_block_proposal_attestation_build_phase_seconds",
(
"Phase-level time in block-proposal attestation selection: "
"select_payloads (greedy child-payload pick), compact "
"(recursive merge of proofs per AttestationData), "
"stf_simulate (candidate block STF)."
),
["phase"],
buckets=BLOCK_PROPOSAL_ATTESTATION_PHASE_BUCKETS,
registry=reg,
)
self.lean_block_proposal_attestation_builds_total = Counter(
"lean_block_proposal_attestation_builds_total",
"Completed block-proposal attestation selection runs (one per proposal attempt).",
registry=reg,
)
self.lean_block_proposal_child_payloads_consumed_total = Counter(
"lean_block_proposal_child_payloads_consumed_total",
(
"Child aggregated payloads selected during greedy proof picking "
"(before recursive compaction)."
),
registry=reg,
)
self.lean_block_proposal_attestation_data_selected = Histogram(
"lean_block_proposal_attestation_data_selected",
"Distinct AttestationData entries in the proposal block body.",
buckets=BLOCK_PROPOSAL_ATTESTATION_DATA_BUCKETS,
registry=reg,
)
self.lean_block_proposal_aggregates_selected = Histogram(
"lean_block_proposal_aggregates_selected",
"Aggregated signature proofs in the proposal result after compaction.",
buckets=BLOCK_PROPOSAL_AGGREGATES_BUCKETS,
registry=reg,
)

# State transition (leanMetrics: State Transition Metrics)
self.lean_latest_justified_slot = Gauge(
"lean_latest_justified_slot",
Expand Down
31 changes: 26 additions & 5 deletions tests/lean_spec/forks/lstar/forkchoice/test_store_attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,33 @@ def test_multiple_attestation_data_grouped_separately(
source=att_data_1.source,
)

# Validators 1 attests to data_1, validator 2 attests to data_2
sig_1 = key_manager.sign_attestation_data(ValidatorIndex(1), att_data_1)
sig_2 = key_manager.sign_attestation_data(ValidatorIndex(2), att_data_2)
# Validators 1, 3 attest to data_1; validators 2, 0 attest to data_2.
# Two distinct sigs per att_data is the minimum non-trivial shape:
# `aggregate()` skips the `1 raw + 0 children` case (a single-validator
# "aggregate" carries no information the raw gossip sig doesn't
# already carry), so this test must seed at least two raw sigs per
# `att_data` for the per-data grouping it is asserting.
attestation_signatures = {
att_data_1: {AttestationSignatureEntry(ValidatorIndex(1), sig_1)},
att_data_2: {AttestationSignatureEntry(ValidatorIndex(2), sig_2)},
att_data_1: {
AttestationSignatureEntry(
ValidatorIndex(1),
key_manager.sign_attestation_data(ValidatorIndex(1), att_data_1),
),
AttestationSignatureEntry(
ValidatorIndex(3),
key_manager.sign_attestation_data(ValidatorIndex(3), att_data_1),
),
},
att_data_2: {
AttestationSignatureEntry(
ValidatorIndex(2),
key_manager.sign_attestation_data(ValidatorIndex(2), att_data_2),
),
AttestationSignatureEntry(
ValidatorIndex(0),
key_manager.sign_attestation_data(ValidatorIndex(0), att_data_2),
),
},
}

store = base_store.model_copy(
Expand Down
35 changes: 35 additions & 0 deletions tests/lean_spec/forks/lstar/state/test_state_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,41 @@ def test_aggregate_with_empty_attestation_signatures(
assert results == []


def test_aggregate_skips_single_gossip_sig_with_no_children(
container_key_manager: XmssKeyManager,
spec: LstarSpec,
) -> None:
"""Trivial 1 raw sig + 0 children case: aggregate returns nothing.

A single-validator "aggregate" carries no information the raw gossip
sig doesn't already carry — the sig is on the per-subnet
`attestation_signatures` gossip topic at sign time, so any peer
aggregator can fold it in as a raw entry next round. The recursive
STARK prover is constant-cost in input size, so building a
1-validator proof spends the full prover budget for zero consensus
signal. The unconsumed gossip sig must remain in
`store.attestation_signatures` so it is folded in by a future round
once another sig or a child shows up.
"""
store = make_store(num_validators=2, key_manager=container_key_manager)
source = Checkpoint(root=make_bytes32(1), slot=Slot(0))
att_data = make_attestation_data_simple(
Slot(2), make_bytes32(3), make_bytes32(4), source=source
)
sig_entry = AttestationSignatureEntry(
ValidatorIndex(0),
container_key_manager.sign_attestation_data(ValidatorIndex(0), att_data),
)
attestation_signatures = {att_data: {sig_entry}}

store = store.model_copy(update={"attestation_signatures": attestation_signatures})
updated_store, results = spec.aggregate(store)

assert results == []
# The lone sig must survive untouched for a later, non-trivial pass.
assert updated_store.attestation_signatures == attestation_signatures


def test_aggregated_signatures_with_multiple_data_groups(
container_key_manager: XmssKeyManager,
spec: LstarSpec,
Expand Down
19 changes: 19 additions & 0 deletions tests/lean_spec/subspecs/metrics/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from lean_spec.subspecs.metrics.registry import (
ATTESTATION_AGGREGATE_COVERAGE_DIFF_DIRECTIONS,
ATTESTATION_AGGREGATE_COVERAGE_SECTIONS,
BLOCK_PROPOSAL_ATTESTATION_BUILD_PHASES,
registry,
)

Expand Down Expand Up @@ -53,3 +54,21 @@ def test_attestation_aggregate_coverage_metrics_registered() -> None:
)
== 0.0
)


def test_block_proposal_attestation_build_metrics_registered() -> None:
"""Block-proposal attestation selection metrics are registered on init."""
test_reg = CollectorRegistry()
registry.init(registry=test_reg)

for phase in BLOCK_PROPOSAL_ATTESTATION_BUILD_PHASES:
assert (
test_reg.get_sample_value(
"lean_block_proposal_attestation_build_phase_seconds_count",
{"phase": phase},
)
is None
)

assert test_reg.get_sample_value("lean_block_proposal_attestation_builds_total") == 0.0
assert test_reg.get_sample_value("lean_block_proposal_child_payloads_consumed_total") == 0.0
Loading