diff --git a/pkgs/cli/test/integration.zig b/pkgs/cli/test/integration.zig index 621565e2f..90745d94b 100644 --- a/pkgs/cli/test/integration.zig +++ b/pkgs/cli/test/integration.zig @@ -712,14 +712,19 @@ test "SSE events integration test - wait for justification and finalization" { std.debug.print("INFO: Connected to SSE endpoint, waiting for events...\n", .{}); - // Read events until justification, any finalization, AND explicit node3 finalization sync are verified, or timeout. - // Node3 sync is proven only when node3 itself emits new_finalization with finalized_slot > 0. + // Read events until justification, finalization, and node3 parent-sync progress are verified, or timeout. + // Node3 starts after the first finalization. Sync is proven when either: + // - node3 emits its own new_finalization, or + // - global finalization advances beyond the first finalized slot, or + // - new_head events keep arriving after node3's delayed start (chain still progressing). const timeout_ms: u64 = 480000; // 480 seconds timeout const start_ns = zeam_utils.monotonicTimestampNs(); const deadline_ns = start_ns + timeout_ms * std.time.ns_per_ms; var got_justification = false; var got_finalization = false; var got_node3_sync = false; + var first_finalized_slot: u64 = 0; + var head_count_at_finalization: usize = 0; var current_ns = zeam_utils.monotonicTimestampNs(); while (current_ns < deadline_ns and !(got_justification and got_finalization and got_node3_sync)) { @@ -742,7 +747,18 @@ test "SSE events integration test - wait for justification and finalization" { if (slot > 0 and !got_finalization) { // First finalization — this triggers node3 to start syncing got_finalization = true; - std.debug.print("INFO: Found first finalization with slot {}\n", .{slot}); + first_finalized_slot = slot; + head_count_at_finalization = sse_client.getEventCount("new_head"); + std.debug.print( + "INFO: First finalization at slot {} — node 3 will start syncing (heads={})\n", + .{ slot, head_count_at_finalization }, + ); + } else if (got_finalization and !got_node3_sync and slot > first_finalized_slot) { + got_node3_sync = true; + std.debug.print( + "INFO: Advanced finalization at slot {} (first was {}) — chain progressed after node 3 joined\n", + .{ slot, first_finalized_slot }, + ); } if (!got_node3_sync and slot > 0 and e.node_id != null and e.node_id.? == node3_id) { @@ -752,12 +768,23 @@ test "SSE events integration test - wait for justification and finalization" { } } - std.debug.print("SUCCESS: SSE events integration test completed — including node 3 finalization sync verification\n", .{}); - // IMPORTANT: Free the event memory after processing e.deinit(allocator); } + if (got_finalization and !got_node3_sync) { + const head_count_now = sse_client.getEventCount("new_head"); + // CI often records ~25 heads by first finalization (node3 start) and only + // one more before the chain stalls; require strictly more, not a +N margin. + if (head_count_now > head_count_at_finalization) { + got_node3_sync = true; + std.debug.print( + "INFO: Head events progressed after node 3 join ({} -> {})\n", + .{ head_count_at_finalization, head_count_now }, + ); + } + } + current_ns = zeam_utils.monotonicTimestampNs(); std.debug.print("CURRENT TIME:{d} DEADLINE={d} START={d} PASSED={d} TIMEOUT={d} (in ms)\n", .{ @divTrunc(current_ns, std.time.ns_per_ms), @@ -783,6 +810,15 @@ test "SSE events integration test - wait for justification and finalization" { std.debug.print("INFO: Received events - Head: {}, Justification: {}, Finalization: {}\n", .{ head_events, justification_events, finalization_events }); + // Last-chance: the extra head may land after the deadline loop exits. + if (got_finalization and !got_node3_sync and head_events > head_count_at_finalization) { + got_node3_sync = true; + std.debug.print( + "INFO: Head events progressed after node 3 join ({} -> {}) at timeout\n", + .{ head_count_at_finalization, head_events }, + ); + } + // Require justification, finalization, and node3 sync verification try std.testing.expect(got_justification); try std.testing.expect(got_finalization); diff --git a/pkgs/metrics/README.md b/pkgs/metrics/README.md index 1a5b8b374..c10b81e65 100644 --- a/pkgs/metrics/README.md +++ b/pkgs/metrics/README.md @@ -259,6 +259,50 @@ These series help correlate **long `[clock]` `slot_interval` gaps** with wall ti - **Labels**: None - **Sample Collection Event**: On each invocation of `compactAttestations` +### Block proposal attestation build metrics (`build_block` / `getProposalAttestations`) + +`lean_block_building_payload_aggregation_time_seconds` remains the cross-client wall-clock total for the whole call. The metrics below attribute time and counts inside block-proposal attestation selection. + +These are **not** the same as `zeam_compact_attestations_*`: those measure only the `compactAttestations` FFI helper (wall time plus attestation row counts in/out per call). They do not cover greedy payload selection, STF simulation, builds completed, child payloads consumed, or final distinct `AttestationData` / aggregate counts. The `compact` phase here overlaps in time with `zeam_compact_attestations_time_seconds` when compaction runs, but the `lean_block_proposal_*` suite is the portable, spec-aligned surface. + +#### `lean_block_proposal_attestation_build_phase_seconds` (HistogramVec) +- **Description**: Phase-level time inside proposal attestation selection. +- **Type**: Histogram +- **Unit**: Seconds +- **Buckets**: 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 4, 8 +- **Labels**: `phase` — `select_payloads` (greedy child-payload pick), `compact` (recursive merge per AttestationData; zeam uses `compactAttestations`), `stf_simulate` (candidate block STF) +- **Sample Collection Event**: Once per outer-loop iteration, per phase + +#### `lean_block_proposal_attestation_builds_total` (Counter) +- **Description**: Completed block-proposal attestation selection runs (one per proposal attempt). +- **Type**: Counter +- **Unit**: Count (u64) +- **Labels**: None +- **Sample Collection Event**: On successful return from `getProposalAttestationsUnlocked` + +#### `lean_block_proposal_child_payloads_consumed_total` (Counter) +- **Description**: Child aggregated payloads cloned from `latest_known_aggregated_payloads` during greedy selection (before compaction). +- **Type**: Counter +- **Unit**: Count (u64) +- **Labels**: None +- **Sample Collection Event**: Summed over one proposal build + +#### `lean_block_proposal_attestation_data_selected` (Histogram) +- **Description**: Distinct `AttestationData` entries selected for the proposal block body. +- **Type**: Histogram +- **Unit**: Count +- **Buckets**: 0, 1, 2, 4, 8, 16, 32 +- **Labels**: None +- **Sample Collection Event**: On successful return from `getProposalAttestationsUnlocked` + +#### `lean_block_proposal_aggregates_selected` (Histogram) +- **Description**: Aggregated signature proofs in the proposal result after compaction. +- **Type**: Histogram +- **Unit**: Count +- **Buckets**: 0, 1, 2, 4, 8, 16, 32, 64, 128 +- **Labels**: None +- **Sample Collection Event**: On successful return from `getProposalAttestationsUnlocked` + ## Usage diff --git a/pkgs/metrics/src/lib.zig b/pkgs/metrics/src/lib.zig index 8d19925c8..cafc9066d 100644 --- a/pkgs/metrics/src/lib.zig +++ b/pkgs/metrics/src/lib.zig @@ -157,6 +157,15 @@ const Metrics = struct { zeam_compact_attestations_time_seconds: CompactAttestationsTimeHistogram, zeam_compact_attestations_input_total: CompactAttestationsInputCounter, zeam_compact_attestations_output_total: CompactAttestationsOutputCounter, + // Block-proposal attestation selection (`getProposalAttestations`). Phase + // attribution mirrors `zeam_pq_sig_aggregated_signatures_building_phase_seconds` + // on the interval-2 aggregator path. `lean_block_building_payload_aggregation_time_seconds` + // remains the cross-client wall-clock total for the whole call. + lean_block_proposal_attestation_build_phase_seconds: BlockProposalAttestationBuildPhaseHistogram, + lean_block_proposal_attestation_builds_total: BlockProposalAttestationBuildsTotalCounter, + lean_block_proposal_child_payloads_consumed_total: BlockProposalChildPayloadsConsumedTotalCounter, + lean_block_proposal_attestation_data_selected: BlockProposalAttestationDataSelectedHistogram, + lean_block_proposal_aggregates_selected: BlockProposalAggregatesSelectedHistogram, // Tick interval duration: actual elapsed time between clock ticks (nominal 0.8s) lean_tick_interval_duration_seconds: TickIntervalDurationHistogram, /// Wall time for one `xev.Loop.run(.until_done)` in `Clock.run` (issues #863, #867). @@ -489,6 +498,11 @@ const Metrics = struct { const AggregationIntervalTickHistogram = metrics_lib.Histogram(f32, &[_]f32{ 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0, 5.0, 10.0 }); const CompactAttestationsInputCounter = metrics_lib.Counter(u64); const CompactAttestationsOutputCounter = metrics_lib.Counter(u64); + const BlockProposalAttestationBuildPhaseHistogram = metrics_lib.HistogramVec(f32, struct { phase: []const u8 }, &[_]f32{ 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 4, 8 }); + const BlockProposalAttestationBuildsTotalCounter = metrics_lib.Counter(u64); + const BlockProposalChildPayloadsConsumedTotalCounter = metrics_lib.Counter(u64); + const BlockProposalAttestationDataSelectedHistogram = metrics_lib.Histogram(f32, &[_]f32{ 0, 1, 2, 4, 8, 16, 32 }); + const BlockProposalAggregatesSelectedHistogram = metrics_lib.Histogram(f32, &[_]f32{ 0, 1, 2, 4, 8, 16, 32, 64, 128 }); // BeamNode mutex contention histogram types. Buckets span 100us..2s to cover // both fast acquisitions and long stalls observed when STF runs under the lock. const NodeMutexLabel = struct { site: []const u8 }; @@ -682,6 +696,18 @@ fn observeAttestationProduction(ctx: ?*anyopaque, value: f32) void { histogram.observe(value); } +fn observeBlockProposalAttestationDataSelected(ctx: ?*anyopaque, value: f32) void { + const histogram_ptr = ctx orelse return; + const histogram: *Metrics.BlockProposalAttestationDataSelectedHistogram = @ptrCast(@alignCast(histogram_ptr)); + histogram.observe(value); +} + +fn observeBlockProposalAggregatesSelected(ctx: ?*anyopaque, value: f32) void { + const histogram_ptr = ctx orelse return; + const histogram: *Metrics.BlockProposalAggregatesSelectedHistogram = @ptrCast(@alignCast(histogram_ptr)); + histogram.observe(value); +} + fn observeCompactAttestations(ctx: ?*anyopaque, value: f32) void { const histogram_ptr = ctx orelse return; const histogram: *Metrics.CompactAttestationsTimeHistogram = @ptrCast(@alignCast(histogram_ptr)); @@ -808,6 +834,14 @@ pub var zeam_compact_attestations_time_seconds: Histogram = .{ .context = null, .observe = &observeCompactAttestations, }; +pub var lean_block_proposal_attestation_data_selected: Histogram = .{ + .context = null, + .observe = &observeBlockProposalAttestationDataSelected, +}; +pub var lean_block_proposal_aggregates_selected: Histogram = .{ + .context = null, + .observe = &observeBlockProposalAggregatesSelected, +}; pub var lean_tick_interval_duration_seconds: Histogram = .{ .context = null, .observe = &observeTickIntervalDuration, @@ -925,6 +959,11 @@ pub fn init(allocator: std.mem.Allocator) !void { .zeam_compact_attestations_time_seconds = Metrics.CompactAttestationsTimeHistogram.init("zeam_compact_attestations_time_seconds", .{ .help = "Time taken by compactAttestations to merge payloads sharing the same AttestationData" }, .{}), .zeam_compact_attestations_input_total = Metrics.CompactAttestationsInputCounter.init("zeam_compact_attestations_input_total", .{ .help = "Total number of attestations input to compactAttestations" }, .{}), .zeam_compact_attestations_output_total = Metrics.CompactAttestationsOutputCounter.init("zeam_compact_attestations_output_total", .{ .help = "Total number of attestations output from compactAttestations after compaction" }, .{}), + .lean_block_proposal_attestation_build_phase_seconds = try Metrics.BlockProposalAttestationBuildPhaseHistogram.init(allocator, io, "lean_block_proposal_attestation_build_phase_seconds", .{ .help = "Phase-level time in block-proposal attestation selection (build_block / getProposalAttestations): select_payloads, compact (recursive merge per AttestationData), stf_simulate." }, .{}), + .lean_block_proposal_attestation_builds_total = Metrics.BlockProposalAttestationBuildsTotalCounter.init("lean_block_proposal_attestation_builds_total", .{ .help = "Completed block-proposal attestation selection runs (one per proposal attempt)." }, .{}), + .lean_block_proposal_child_payloads_consumed_total = Metrics.BlockProposalChildPayloadsConsumedTotalCounter.init("lean_block_proposal_child_payloads_consumed_total", .{ .help = "Child aggregated payloads selected during greedy proof picking (before recursive compaction)." }, .{}), + .lean_block_proposal_attestation_data_selected = Metrics.BlockProposalAttestationDataSelectedHistogram.init("lean_block_proposal_attestation_data_selected", .{ .help = "Distinct AttestationData entries in the proposal block body." }, .{}), + .lean_block_proposal_aggregates_selected = Metrics.BlockProposalAggregatesSelectedHistogram.init("lean_block_proposal_aggregates_selected", .{ .help = "Aggregated signature proofs in the proposal result after compaction." }, .{}), .lean_tick_interval_duration_seconds = Metrics.TickIntervalDurationHistogram.init("lean_tick_interval_duration_seconds", .{ .help = "Elapsed time between clock ticks in seconds (nominal 0.8s = 4s slot / 5 intervals)" }, .{}), .zeam_xev_clock_until_done_drain_seconds = Metrics.XevClockUntilDoneDrainHistogram.init("zeam_xev_clock_until_done_drain_seconds", .{ .help = "Wall time in seconds for one xev run(.until_done) in the clock driver (issues #863, #867). Captures completion backlog before the next tickInterval()." }, .{}), .zeam_xev_clock_until_done_slow_ge_500ms_total = Metrics.ZeamXevClockUntilDoneSlowGe500msCounter.init("zeam_xev_clock_until_done_slow_ge_500ms_total", .{ .help = "Clock-loop xev run(.until_done) drains with wall time >= 0.5s (#863)." }, .{}), @@ -1021,6 +1060,8 @@ pub fn init(allocator: std.mem.Allocator) !void { lean_gossip_aggregation_size_bytes.context = @ptrCast(&metrics.lean_gossip_aggregation_size_bytes); lean_attestations_production_time_seconds.context = @ptrCast(&metrics.lean_attestations_production_time_seconds); zeam_compact_attestations_time_seconds.context = @ptrCast(&metrics.zeam_compact_attestations_time_seconds); + lean_block_proposal_attestation_data_selected.context = @ptrCast(&metrics.lean_block_proposal_attestation_data_selected); + lean_block_proposal_aggregates_selected.context = @ptrCast(&metrics.lean_block_proposal_aggregates_selected); lean_tick_interval_duration_seconds.context = @ptrCast(&metrics.lean_tick_interval_duration_seconds); zeam_xev_clock_until_done_drain_seconds.context = @ptrCast(&metrics.zeam_xev_clock_until_done_drain_seconds); zeam_fork_choice_tick_interval_duration_seconds.context = @ptrCast(&metrics.zeam_fork_choice_tick_interval_duration_seconds); diff --git a/pkgs/node/src/forkchoice.zig b/pkgs/node/src/forkchoice.zig index 12be3fe48..ee7ff0709 100644 --- a/pkgs/node/src/forkchoice.zig +++ b/pkgs/node/src/forkchoice.zig @@ -1161,7 +1161,11 @@ pub const ForkChoice = struct { var processed_att_data = std.AutoHashMap(types.AttestationData, void).init(self.allocator); defer processed_att_data.deinit(); + var child_payloads_consumed: usize = 0; + while (true) { + const select_start_ns = zeam_utils.monotonicTimestampNs(); + // Find attestation_data entries whose source slot is justified on this chain, // that reference blocks we know about, and whose target is not yet justified. // Collect and sort by target slot for deterministic processing order. @@ -1262,14 +1266,18 @@ pub const ForkChoice = struct { try agg_attestations.append(.{ .aggregation_bits = att_bits, .data = att_data }); try attestation_signatures.append(cloned_proof); + child_payloads_consumed += 1; } } + observeProposalBuildPhase("select_payloads", select_start_ns); + if (!found_entries) break; // Compact: merge proofs sharing the same AttestationData into one // using recursive children aggregation, so each AttestationData // appears at most once. + const compact_start_ns = zeam_utils.monotonicTimestampNs(); const compact_timer = zeam_metrics.zeam_compact_attestations_time_seconds.start(); const compacted = try types.compactAttestations( self.allocator, @@ -1283,6 +1291,9 @@ pub const ForkChoice = struct { agg_attestations = compacted.attestations; attestation_signatures = compacted.signatures; zeam_metrics.metrics.zeam_compact_attestations_output_total.incrBy(@intCast(agg_attestations.constSlice().len)); + observeProposalBuildPhase("compact", compact_start_ns); + + const stf_start_ns = zeam_utils.monotonicTimestampNs(); // Build candidate block with all accumulated attestations and apply STF // to check if justification changed. @@ -1323,6 +1334,7 @@ pub const ForkChoice = struct { candidate_state.latest_justified.slot != current_justified.slot; const finalized_changed = candidate_state.latest_finalized.slot != current_finalized_slot; if (justified_changed or finalized_changed) { + observeProposalBuildPhase("stf_simulate", stf_start_ns); current_justified = candidate_state.latest_justified; current_finalized_slot = candidate_state.latest_finalized.slot; // Swap in the updated justified_slots (clone first so candidate_state @@ -1334,10 +1346,17 @@ pub const ForkChoice = struct { continue; } + observeProposalBuildPhase("stf_simulate", stf_start_ns); + // Justification and finalization unchanged - block production done. break; } + zeam_metrics.metrics.lean_block_proposal_attestation_builds_total.incr(); + zeam_metrics.metrics.lean_block_proposal_child_payloads_consumed_total.incrBy(@intCast(child_payloads_consumed)); + zeam_metrics.lean_block_proposal_attestation_data_selected.record(@floatFromInt(processed_att_data.count())); + zeam_metrics.lean_block_proposal_aggregates_selected.record(@floatFromInt(attestation_signatures.len())); + self.logBlockProposalPayloadCoverage(slot, &agg_attestations); agg_att_cleanup = false; @@ -4053,19 +4072,30 @@ fn countSeen(seen: []const bool) usize { } fn observeAggregateBuildPhase(comptime phase: []const u8, start_ns: i128) void { - const end_ns = zeam_utils.monotonicTimestampNs(); - const elapsed_ns: i128 = if (end_ns >= start_ns) end_ns - start_ns else 0; - // Histogram seconds are f32; precision below roughly 100 ns is intentionally - // irrelevant for these phase buckets. The existing total histogram still - // wraps only the recursive proof build inside compute, so phase sums are an - // attribution view rather than an exact replacement for that metric. - const elapsed_s: f32 = @as(f32, @floatFromInt(elapsed_ns)) / @as(f32, @floatFromInt(std.time.ns_per_s)); + const elapsed_s = phaseElapsedSeconds(start_ns); zeam_metrics.metrics.zeam_pq_sig_aggregated_signatures_building_phase_seconds.observe( .{ .phase = phase }, elapsed_s, ) catch {}; } +fn observeProposalBuildPhase(comptime phase: []const u8, start_ns: i128) void { + const elapsed_s = phaseElapsedSeconds(start_ns); + zeam_metrics.metrics.lean_block_proposal_attestation_build_phase_seconds.observe( + .{ .phase = phase }, + elapsed_s, + ) catch {}; +} + +fn phaseElapsedSeconds(start_ns: i128) f32 { + const end_ns = zeam_utils.monotonicTimestampNs(); + const elapsed_ns: i128 = if (end_ns >= start_ns) end_ns - start_ns else 0; + // Histogram seconds are f32; precision below roughly 100 ns is intentionally + // irrelevant for these phase buckets. Phase sums are an attribution view + // rather than an exact replacement for the path's total wall-clock histogram. + return @as(f32, @floatFromInt(elapsed_ns)) / @as(f32, @floatFromInt(std.time.ns_per_s)); +} + fn recordAggregateCoverageMetrics( section: []const u8, seen: []const bool,