diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index be3622d..d8bf604 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -21,7 +21,7 @@ jobs: go-version: '1.22' - uses: 'actions/checkout@v4' - name: 'golangci-lint' - uses: 'golangci/golangci-lint-action@v6' + uses: 'golangci/golangci-lint-action@v8' with: version: 'latest' args: '--timeout=60m' diff --git a/.golangci.yml b/.golangci.yml index d431254..d14151f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,120 +1,14 @@ -# This file contains all available configuration options -# with their default values (in comments). -# -# This file is not a configuration example, -# it contains the exhaustive configuration with explanations of the options. - -issues: - # Which files to exclude: they will be analyzed, but issues from them won't be reported. - # There is no need to include all autogenerated files, - # we confidently recognize autogenerated files. - # If it's not, please let us know. - # "/" will be replaced by current OS file path separator to properly work on Windows. - # Default: [] - exclude-files: - - ".*_ssz\\.go$" - - include: - - 'EXC0002' - - 'EXC0005' - - 'EXC0009' - - 'EXC0011' - - 'EXC0012' - - 'EXC0014' - -# Options for analysis running. +version: "2" run: - # The default concurrency value is the number of available CPU. - # concurrency: 4 - - # Timeout for analysis, e.g. 30s, 5m. - # Default: 1m - timeout: 10m - - # Exit code when at least one issue was found. - # Default: 1 - # issues-exit-code: 2 - - # Include test files or not. - # Default: true - tests: false - - # List of build tags, all linters use it. - # Default: []. - # build-tags: - # - mytag - - # Which dirs to skip: issues from them won't be reported. - # Can use regexp here: `generated.*`, regexp is applied on full path. - # Default value is empty list, - # but default dirs are skipped independently of this option's value (see skip-dirs-use-default). - # "/" will be replaced by current OS file path separator to properly work on Windows. - # skip-dirs: - # - autogenerated_by_my_lib - - # Enables skipping of directories: - # - vendor$, third_party$, testdata$, examples$, Godeps$, builtin$ - # Default: true - # skip-dirs-use-default: false - - # If set we pass it to "go list -mod={option}". From "go help modules": - # If invoked with -mod=readonly, the go command is disallowed from the implicit - # automatic updating of go.mod described above. Instead, it fails when any changes - # to go.mod are needed. This setting is most useful to check that go.mod does - # not need updates, such as in a continuous integration and testing system. - # If invoked with -mod=vendor, the go command assumes that the vendor - # directory holds the correct copies of dependencies and ignores - # the dependency descriptions in go.mod. - # - # Allowed values: readonly|vendor|mod - # By default, it isn't set. modules-download-mode: readonly - - # Allow multiple parallel golangci-lint instances running. - # If false (default) - golangci-lint acquires file lock on start. + tests: false allow-parallel-runners: true - - # Define the Go version limit. - # Mainly related to generics support since go1.18. - # Default: use Go version from the go.mod file, fallback on the env var `GOVERSION`, fallback on 1.18 - # go: '1.19' - - -# output configuration options output: formats: - - format: colored-line-number + text: path: stderr - -# All available settings of specific linters. -linters-settings: - gosec: - excludes: - - G115 # This generates a lot of false positives, recheck once https://github.com/securego/gosec/issues/1212 is closed - - lll: - line-length: 132 - - nlreturn: - # Allow two-line blocks without requiring a newline - block-size: 3 - - stylecheck: - checks: [ "all", "-ST1000" ] - - tagliatelle: - case: - # use-field-name: true - rules: - json: snake - yaml: snake - linters: - # Enable all available linters. - # Default: false - enable-all: true - # Disable specific linter - # https://golangci-lint.run/usage/linters/#disabled-by-default + default: all disable: - cyclop - depguard @@ -124,20 +18,58 @@ linters: - exhaustruct - forcetypeassert - funlen - - gci - gochecknoglobals - gocognit - goconst - - ireturn + - gomodguard - inamedparam + - ireturn - lll - mnd - nestif - nilnil - nlreturn + - noinlineerr - perfsprint - promlinter - - tenv - varnamelen - wrapcheck - wsl + - wsl_v5 + settings: + gosec: + excludes: + - G115 + nlreturn: + block-size: 3 + staticcheck: + checks: + - all + - -ST1000 + tagliatelle: + case: + rules: + json: snake + yaml: snake + exclusions: + generated: lax + presets: + - common-false-positives + - std-error-handling + paths: + - .*_ssz\.go$ + - third_party$ + - builtin$ + - examples$ +formatters: + enable: + - gofmt + - gofumpt + - goimports + exclusions: + generated: lax + paths: + - .*_ssz\.go$ + - third_party$ + - builtin$ + - examples$ diff --git a/docs/adr/0001-summarizer-observability-shape.md b/docs/adr/0001-summarizer-observability-shape.md new file mode 100644 index 0000000..3d315d1 --- /dev/null +++ b/docs/adr/0001-summarizer-observability-shape.md @@ -0,0 +1,149 @@ +# 0001 — Use a single labelled lag gauge for summarizer stall observability + +* Status: Accepted (2026-04-30); amended 2026-05-05 — see "Amendment 1" below. +* Date: 2026-04-30 +* Scope: `services/summarizer/standard` in `wealdtech/chaind`. Companion alerts live in `attestantio/ops`. + +## Context and Problem Statement + +`services/summarizer/standard.summarizeEpoch` returns `(false, nil)` when validator balances for an epoch are unavailable. The outer `summarizeEpochs` exits the loop silently with only a `Debug` log. Because two downstream pipelines (`summarizeBlocks`, `summarizeValidators`) cap their target epoch at `md.LastEpoch`, the silent skip freezes the entire summarizer until the underlying data condition resolves. The 2026-04-07 Hoodi incident produced a multi-week Grafana gap before any human noticed. + +We need stall observability that an operator can alert on within minutes. The shape of that observability is not obvious; several reasonable designs exist. + +## Decision Drivers + +- An alert must fire within minutes of any stall, not after a long-tail dashboard review. +- The signal should generalize across all stall shapes — including persistent errors in the `summarizeBlocks` and `summarizeValidators` pipelines, not only the `(false, nil)` site that triggered the original incident. +- Operators already reason about consumer-lag and replication-lag metrics; the new metric should fit that mental model so runbooks stay simple. +- Public Prometheus metric shape is hard to reverse — renaming or relabelling is a breaking change for downstream alerts, so the choice must be deliberate. + +## Considered Options + +1. **A single labelled lag gauge** — `chaind_summarizer_lag_epochs{pipeline}`. +2. **Original plan: `stalled_epoch` gauge + `stall_ticks_total{reason}` counter** — a paired stall-event metric pinned to the silent-skip site. +3. **Hybrid: lag gauge + `silent_skip_total{reason}` counter** — Option 1 plus an event counter for forensic detail. +4. **Pre-populate the gauge at service startup via `eth2Client.Finality()`** — variant of Option 1 that closes the cold-start blind spot. + +## Decision Outcome + +Chosen option: **Option 1 — a single labelled lag gauge**, because it surfaces every stall shape the summarizer can produce (not just the original `(false, nil)` site), collapses alerting to a single rule that matches operator intuition, and avoids coupling Prometheus cardinality to an internal enum that may evolve. + +### Implementation notes + +Stall state is exposed as a single Prometheus `GaugeVec`: + +``` +chaind_summarizer_lag_epochs{pipeline} = float64(targetEpoch) - float64(md.LastXEpoch) +``` + +where the `pipeline` label takes one of three values: `epoch`, `block`, or `validator`. + +The gauge is updated by a private helper `updateLagGauges(ctx, finalizedEpoch)`, registered via `defer` at the top of `OnFinalityUpdated` so it fires on every handler invocation including all error-return paths. The pure-compute half (`setLagGauges`) is split out as a testable seam: internal-package unit tests drive the gauge math with synthetic metadata without standing up a database. Disabled pipelines (e.g. when `s.blockSummaries == false`) never call `WithLabelValues(...)` and so omit their series entirely; the alert query naturally ignores absent labels. + +The `float64` cast is deliberate: a `uint64` subtraction would underflow to ~`1.8e19` if metadata is briefly ahead of finality. + +Per-event diagnostic detail (which epoch is stuck, which silent-skip branch fired) lives in `Warn` logs at the two silent-skip sites — *not* in a counter, *not* in a `reason` label. The paging signal is the lag gauge alert; the logs are forensic context that becomes useful once the alert fires. + +### Positive consequences + +- One alert rule (`chaind_summarizer_lag_epochs > 2 for 15m`) covers every stall shape the summarizer can produce, including future ones. +- Operator mental model matches existing consumer-lag and replication-lag metrics elsewhere in the fleet. +- Metric maintenance is a single series with a small label set instead of two metrics with overlapping semantics. +- High-cardinality diagnostic detail lives in logs, where it belongs, without forcing Prometheus to track an enum that may grow. + +### Negative consequences + +- **The metric is part of the public contract** with operators. Renaming the metric or its labels is a breaking change for anyone running the alert rule. If we ever need to evolve the shape, we add a new metric alongside this one and deprecate the old. +- **Alert wording is coupled to log message text.** The runbook annotation references the literal `Warn` strings: + - `"No validator balances available; cannot summarize epoch (will retry on next finality tick)"` (from `epoch.go`, the `len(balances) == 0` branch). + - `"Not enough data to update summary; will retry on next finality tick"` (from `handler.go`, the `!updated` branch in `summarizeEpochs`). + + Changing those strings without updating the alert annotation breaks operator UX. The implementation includes an `Alert annotation contract:` code comment near each log line and on the `summarizerLagEpochs` variable declaration in `metrics.go`, so the contract is visible at the point of edit. +- **No per-event counter** means we cannot trivially answer "how often is the silent-skip path firing across the fleet?" That question, if it arises, will be answered from log-based metrics (e.g. Loki/Mimir) or from sampling logs in incidents. If the question becomes recurrent, that is itself a signal that we should re-introduce a counter — superseding this ADR rather than retrofitting one in silently. +- **A six-minute cold-start blind spot** exists between service startup and the first `OnFinalityUpdated` tick during which the gauge has no value. This is an accepted trade-off (see Option 4 below) covered by the existing flat-`chaind_summarizer_epochs_processed_total` alert. + +## Pros and Cons of the Options + +### Option 1 — Single labelled lag gauge (chosen) + +- **Good** — generalizes across all summarizer stall shapes, including future ones. +- **Good** — alert rules collapse to a single `> 2 for 15m` query. +- **Good** — matches operator intuition for lag-style metrics. +- **Bad** — does not provide a per-event count of silent-skip occurrences; that question is deferred to logs. + +### Option 2 — `stalled_epoch` gauge + `stall_ticks_total{reason}` counter (rejected) + +- **Good** — explicit per-incident counter is convenient for "how many times did this fire" questions. +- **Bad** — only fires on the `(false, nil)` path inside `summarizeEpoch`. A persistent error inside `summarizeBlocksInEpoch` that causes `LastBlockEpoch` to fall behind `LastEpoch` would leave the gauge at zero and the alert silent — which is the original incident shape we are trying to prevent. +- **Bad** — the `reason` enum couples public metric cardinality to an internal classification we may want to evolve. Adding or renaming reasons becomes a breaking-change concern. + +### Option 3 — Hybrid (lag gauge + `silent_skip_total{reason}` counter, rejected) + +- **Good** — preserves the generalized alert from Option 1 while keeping a per-event counter. +- **Bad** — the forensic information the counter provides is already in the new `Warn` logs at the same sites. A per-event counter duplicates the log signal at higher cost (Prometheus cardinality, label maintenance, dashboard panels to ignore) without giving operators anything the logs do not. + +### Option 4 — Pre-populate the gauge at service startup via `eth2Client.Finality()` (rejected) + +- **Good** — closes the six-minute window after restart during which the gauge has no value. +- **Bad** — engineering for an unlikely failure shape ("chaind never receives a finality tick at all"). That shape is already covered by alerts on flat `chaind_summarizer_epochs_processed_total`. +- **Bad** — adds network I/O to the bootstrap path, creating new failure modes (slow or failing Beacon node at startup) for the small benefit of populating one gauge value six minutes earlier. + +## Links + +- Incident: 2026-04-07 chaindb-d04 outage (multi-week silent stall before detection) +- Implementation: chaind commit `066bb38` ("Add summarizer lag observability") — registers `chaind_summarizer_lag_epochs`, adds the two `Warn` log sites, and ships the `updateLagGauges` / `setLagGauges` split +- Companion alert rule: `attestantio/ops` work item — deploys the `chaind_summarizer_lag_epochs > 2 for 15m` alert against the strings above + +## Amendment 1 — cursor-diff formula and zero-balance guard (2026-05-05) + +* Status: Accepted, amends the Decision Outcome and Implementation Notes above. + +### Motivation + +The original formula `targetEpoch - md.LastXEpoch` was structurally incapable of detecting the silent-corruption mode the alert was meant to catch. End-to-end smoke verification on a seeded chaind instance — DELETE all `t_validator_balances` rows for one finalized epoch, observe behavior — established the failure shape empirically: the gauge stayed pinned at `0/0/0` throughout, neither alert-contract `Warn` log fired, the summarizer cursor advanced past the corrupt epoch, and `t_epoch_summaries.f_active_balance` was silently written as `0` for the affected epoch. + +Root cause is in `services/chaindb/postgresql/validators.go` (`ValidatorBalancesByEpoch`): a `LEFT JOIN` of `t_validators` against the per-epoch balance subquery, with `COALESCE(f_balance, 0)`. When no balance rows exist for an epoch, the function returns ~1.3M rows of zeros instead of an empty slice — so the `len(balances) == 0` guard in `summarizeEpoch` never trips in any production state where `t_validators` is populated. The cursor advances unconditionally, the gauge stays at zero, the alert never fires, and corrupted summaries are written. + +The `0` reading is fatally ambiguous for an alert metric — it can mean "summarizer is healthy" or "summarizer is silently corrupting data" with no way for a downstream consumer to distinguish them. + +### Amended formula + +The metric name (`chaind_summarizer_lag_epochs`), label key (`pipeline`), label values (`epoch`/`block`/`validator`), and gauge registration are **unchanged** — this is a formula amendment, not a metric replacement. Each pipeline now measures its lag against its **direct** upstream cursor: + +| label | new formula | meaning | +|---|---|---| +| `pipeline=epoch` | `validators.latest_balances_epoch - summarizer.latest_epoch` | how far behind the upstream balances cursor the epoch-summary cursor is | +| `pipeline=block` | `summarizer.latest_epoch - summarizer.latest_block_epoch` | how far behind the epoch-summary cursor the block-summary sub-pipeline is | +| `pipeline=validator` | `summarizer.latest_epoch - summarizer.latest_validator_epoch` | how far behind the epoch-summary cursor the validator-summary sub-pipeline is | + +The reading-side now fetches both `summarizer.standard` and `validators.standard` rows from `t_metadata` per finality tick. Diffs are clamped to `[0, +Inf)` via a small helper to absorb the benign race window where a downstream cursor is read ahead of its upstream across two metadata reads. + +### Defensive zero-balance assertion + +Independently of the gauge change, `summarizeEpoch` now refuses to write a corrupt summary. After the active-validator balance accumulation loop and before `BeginTx`, if `summary.ActiveValidators > 0 && summary.ActiveBalance == 0` the function emits the existing alert-contract `Warn` log (verbatim, single-sourced with the bootstrap-path `len(balances) == 0` branch above it) and returns `(false, nil)`. The cursor stays put, the cycle retries on the next finality tick, and the row is never written to `t_epoch_summaries`. This keeps the data-integrity guarantee even if the alert is missed. + +### Why amend instead of supersede + +The metric/label/registration contract is unchanged — operators running the existing alert rule do not need to migrate. The formula change is a defect fix that recovers the *originally intended* semantics ("detect when the balance pipeline falls behind the summarizer"), not a redesign. Per the original "Public Prometheus metric shape is hard to reverse" decision driver, supersession is reserved for breaking changes to the operator-facing contract. + +### Why cursor-diff over the original formula + +1. **Decoupled from cycle outcomes.** Whether a finality cycle errors, runs with garbage, succeeds, or is skipped, the gauge measures actual cursor-state divergence at read time. It cannot lie about lag because it isn't computed from the lossy cycle-completion signal. +2. **Disambiguates `0` readings.** A `0` now means exactly one thing: "downstream cursor matches its upstream right now." +3. **Per-pipeline labels carry distinct diagnostic value.** + - `epoch` lag growing → upstream balances pipeline is stalled (network, API, validators-pipeline crash). Action: investigate `services/validators/standard/`. + - `block` lag growing → block-summary sub-pipeline is internally lagging behind epoch summarization (DB write contention, slow query). Action: investigate `summarizeBlocksInEpoch`. + - `validator` lag growing → same diagnosis pattern but for the validator sub-pipeline (`summarizeValidatorsInEpoch`). +4. **Alert thresholds become operationally direct.** "lag > N for M minutes" reads as "the upstream pipeline has been ≥N epochs ahead of the downstream pipeline for ≥M minutes" — a direct operational statement, not a proxy reading via cycle-completion. + +### Considered options for the fix + +- **Option A** — Add an "all-zero balances" guard inside `summarizeEpoch`. Adopted (the defensive assertion above). +- **Option B** — Change `ValidatorBalancesByEpoch` itself to return a `nil` slice when no rows. Rejected. The `LEFT JOIN` exists for a real reason: zero-balance validators don't have rows; the JOIN ensures every validator is represented in the result. Touching the SQL risks regressing the legitimate case. The defect is in the summarizer trusting the all-zero answer at face value, so the guard belongs at the summarizer layer where it can check the business invariant (`ActiveBalance > 0` when `ActiveValidators > 0`) rather than a structural property of the SQL. +- **Option C** — Re-instrument the lag gauge with cursor-diff math. Adopted (this amendment). + +A and C ship together: C ensures the alert fires; A ensures no corrupt summary is written even if the alert is missed. Belt and suspenders. + +### Operator-facing changes + +Metric name, labels, and registration are unchanged — no migration required. **The threshold *meaning* has changed**: the gauge now reports data-cursor divergence rather than cycle-completion lag. For the existing `> 2 for 15m` rule the practical behavior is similar (both fire when the summarizer falls behind), but the new signal is sensitive to the previously-undetectable silent-corruption mode. Companion alert spec in `attestantio/ops` should reference the amended semantics; per-pipeline diagnosis hints become useful in the runbook because the labels now carry genuinely distinct failure surfaces. diff --git a/docs/adr/0002-validator-balance-fetcher-recovery-model.md b/docs/adr/0002-validator-balance-fetcher-recovery-model.md new file mode 100644 index 0000000..72ab197 --- /dev/null +++ b/docs/adr/0002-validator-balance-fetcher-recovery-model.md @@ -0,0 +1,252 @@ +# 0002 — Validator-balance fetcher recovery model + +* Status: Accepted (2026-05-07). +* Date: 2026-05-07 +* Scope: `services/validators/standard` in `wealdtech/chaind`. Adjacent + consumer: `services/summarizer/standard.summarizeEpoch` (reads + `t_validator_balances` via `ValidatorBalancesByEpoch`). + +## Context and Problem Statement + +The validator-balance fetcher in `services/validators/standard` is +responsible for keeping `t_validator_balances` in lock-step with finalised +epochs. Its persistence-and-recovery contract was never written down. +The 2026-04-07 `chaindb-d04` outage and the subsequent root-cause +investigation revealed two distinct contract questions that future +maintainers will re-derive from code unless they are recorded: + +1. **Persistence atomicity** — under what failure modes can the fetcher's + own cursor (`md.LatestBalancesEpoch`) end up pointing past an epoch + for which no balance rows are durable in `t_validator_balances`? +2. **Recovery on restart** — when chaind restarts, what guarantees does + the fetcher provide about epochs that elapsed during the downtime? + +The investigation surfaced that the answer to (1) depends on which +upstream failure mode triggers it: kill-9 crashes are safe, but a +"beacon returned empty 200 OK" path can advance the cursor without any +balance rows being persisted, producing 113 historical clusters of +silent corruption (828 epochs across 9 months on a production chaindb +instance). The answer to (2) is "cursor-trusting, not gap-detecting" +— sound under crash recovery (because of (1)'s atomicity) but unsound +under any other gap source. + +This ADR records the contract as it **should** be and names the existing +points where the implementation matches and where it does not. The +implementation gap on the empty-response path is tracked as a follow-up +work item. + +## Decision Drivers + +- A future engineer modifying the fetcher must be able to reason about + what is and is not promised, without re-running the root-cause + investigation. +- The recorded contract must hold across the full set of upstream + failure modes — not just kill-9 — because production data shows + multiple shapes have occurred. +- The contract must compose with the summarizer's downstream invariants + (post-`1edcb45` the summarizer assumes "if balances are present they + are correct"; the fetcher must not produce missing-but-cursor-claims- + present states). +- The contract should be expressible in a few sentences; if it cannot, + the design is over-complex. + +## Considered Options + +1. **Cursor-trusting recovery with empty-response guarded** — + atomic co-commit of cursor + balance rows; recovery on restart + resumes from `LatestBalancesEpoch+1` with no row-count + reconciliation; an `len(validatorsResponse.Data) == 0` guard rejects + empty beacon responses by returning an error so the transaction + rolls back and the cursor stays put. *(The contract this ADR + adopts.)* +2. **Cursor-trusting recovery without an empty-response guard** — + atomic co-commit of cursor + balance rows; recovery resumes from + `LatestBalancesEpoch+1`; empty beacon responses silently advance the + cursor with zero rows persisted. *(Today's behavior; this is the + bug the investigation identified.)* +3. **Gap-detecting recovery** — on restart, scan + `t_validator_balances` for missing epochs in the + `[startup_floor, LatestBalancesEpoch]` range and re-fetch them from + the beacon. Replace cursor-trusting with row-count reconciliation. +4. **Eager re-fetch via a `MissedEpochs` queue** — populate + `metadata.MissedEpochs` whenever a fetch fails, retry queued + epochs on subsequent ticks until empty. Apparently the original + design intent of the field (the field is declared in + `services/validators/standard/metadata.go:28`, JSON-migrated, but + never populated). + +## Decision Outcome + +Chosen option: **Option 1 — cursor-trusting recovery with +empty-response guarded.** + +### Rationale + +- Option 1 is the smallest delta from current code that closes the + observed silent-corruption pathway. The atomic co-commit + invariant from option 2 is preserved; only the empty-response path + is hardened. +- Option 3 (gap-detecting recovery) is more robust but substantially + more invasive: it requires per-epoch row-count semantics in + `chaindb.ValidatorsProvider`, a new schema migration to record + per-epoch row-count expectations, and a startup phase that scans a + large table. The cost is not justified by the production data — + Option 1 catches every observed failure mode. +- Option 4 (eager re-fetch via `MissedEpochs`) is the apparent + original design intent and is consistent with the field's presence + in `metadata.go`. However, the `MissedEpochs` consumer code in the + *proposerduties* service (`services/proposerduties/standard/service.go:166-204`) + reads from a permanently-empty list — the population logic was + never written for any service. Reviving this path would require + designing the population logic from scratch, with no existing + precedent to follow. Option 1 is simpler and addresses the same + failure modes. The `MissedEpochs` field is documented as + deprecated (in this slice) so that future maintainers do not + accidentally activate dormant consumer code. + +### The contract — "cursor-trusting recovery with empty-response guarded" + +In one sentence: **`md.LatestBalancesEpoch` advances if and only if +`t_validator_balances` contains rows for the corresponding epoch in the +same database transaction**. + +Decomposed into three claims: + +1. **Atomic co-commit (already enforced)**: `SetValidatorBalances`, + `setMetadata` (which writes the new cursor value), and + `CommitTx` all run inside a single Postgres transaction in + `onEpochTransitionValidatorBalancesForEpoch` + (`services/validators/standard/handler.go:198-243`). A crash at + any point pre-commit rolls back both writes; a successful commit + makes both visible together. Verified empirically by a kill-9 + reproducer. + +2. **No-empty-write (must be enforced; currently not)**: a beacon + response with zero validators or with all validators having + `Balance == 0` must be treated as a fetch failure, not a fetch + success. The fetcher must return an error and not advance the + cursor. The next finality tick will retry. *This is the + investigation-identified defect; a follow-up brings the + implementation into compliance.* + +3. **Cursor-trusting recovery (already enforced)**: on chaind restart, + `updateAfterRestart` (`services/validators/standard/service.go:84-137`) + calls `onEpochTransitionValidatorBalances(ctx, md, currentEpoch)`, + which loops from `md.LatestBalancesEpoch+1` to the current epoch + and re-fetches each. No row-count reconciliation; the cursor is + the authoritative record of "what has been persisted." With (1) + and (2) holding, this is sound. + +### What this contract does NOT promise + +- **Recovery from "beacon-state retention exceeded"**: if chaind is + off-air for longer than the beacon's state-retention horizon + (typically 8 epochs for Prysm with default flags, ~2 weeks for + Lighthouse), the beacon may return empty responses for the missed + epochs. Under this contract, the fetcher will stall (refusing to + advance the cursor) and the lag gauge from ADR 0001 will fire the + alert. The operator must then either repair the beacon (e.g., + point at a different endpoint with longer retention) or explicitly + acknowledge the data loss with a manual cursor advance. This is + the *correct* behaviour for a chronic upstream condition; turning + it into automated data loss is a separate decision and is not part + of this contract. +- **Per-validator row-count completeness**: the fetcher writes one + row per validator with non-zero balance. Validators with + `Balance == 0` are intentionally not stored (see handler.go:206-208). + Downstream consumers must not assume every validator index has a + row in `t_validator_balances` for every epoch. The summarizer's + zero-balance assertion (commit `1edcb45` and `8d9e6f2`) handles + this correctly. + +### Implementation notes + +The bug-confirming test +`services/validators/standard/handler_internal_test.go` exercises all +three contract claims: + +- **Claim 1 (atomic co-commit)**: covered by the *beacon error* case, + which asserts the cursor is not advanced and no transaction is + committed when `Validators()` returns an error. +- **Claim 2 (no-empty-write)**: covered by the *empty validators map* + and *all-zero balances* cases, both of which currently document + the bug (test passes against current code where the cursor *does* + advance). When the follow-up fix lands, the assertions in those + cases invert from "cursor advanced" to "cursor unchanged, error + returned" and the test serves as the regression boundary for the + contract. +- **Claim 3 (cursor-trusting recovery)** is exercised end-to-end by + the existing integration smoke tests on the local Hoodi playbook + and the kill-9 reproducer. No further unit-level coverage is + added here. + +### The retention-pruning interaction (related but separate) + +The pruner consults the *day-summary* cursor, not the +*epoch-summarizer* cursor (`services/summarizer/standard/prune.go:64-67`). +The proxy holds because day summaries entail epoch summaries, but the +contract is implicit. The investigation's code-reading audit flagged +this as a residual risk; the kill-9 reproducer found no exploit; the +production forensic data shows no pattern consistent with retention- +race triggering. No contract change is filed here. If the empty- +response amplifier had been refuted as a load-bearing cause, the +retention-ordering question would have been the next target — but it +was not, so this ADR does not expand its scope. + +### Positive consequences + +- The fetcher's promised behaviour matches the summarizer's + assumptions on `t_validator_balances` content. The two services + compose without unstated invariants. +- A future maintainer reading + `services/validators/standard/handler.go` can read this ADR and + understand the contract without re-running the investigation. +- The contract scopes the fetcher's responsibility narrowly: it + guarantees consistency between cursor and rows, but does not + guarantee freshness (the summarizer's lag gauge from ADR 0001 is + the freshness signal). + +### Negative consequences / accepted trade-offs + +- The contract requires the fetcher to **stall** (cursor not + advancing) on chronic upstream failures. Operators who would + prefer "skip the bad epochs and keep going" must take that + decision explicitly via a manual cursor advance — chaind will not + silently absorb data loss. This is a deliberate UX choice. +- Contract claim (2) is tested only against synthetic stubs in this + slice; the post-fix end-to-end behaviour (production beacon flap → + fetcher stall → lag gauge alert → operator-driven recovery) is + not exercised by automated tests. It can be by extending the + Hoodi playbook with a "beacon empties for 2 minutes" mode in a + future slice. + +### Rejected alternatives — why not + +- **Option 2 (no empty-response guard)** is the current (buggy) + state. Production data showed it is responsible for the silent- + corruption pathway across 113 historical clusters. Cannot stand + as the documented contract. +- **Option 3 (gap-detecting recovery)** is more invasive than the + data justifies. Disk and CPU cost of the startup scan, plus a new + schema-migration burden, plus operator-visible latency on chaind + startup. Reconsider only if a future verdict identifies a failure + mode that Option 1 does not catch. +- **Option 4 (`MissedEpochs` queue)** had a chance to be the + intended design but the consumer code never had a population path, + meaning re-deriving the design from scratch. No precedent + cheapens this option below Option 1. See the field's deprecation + note in `services/validators/standard/metadata.go`. + +## Open questions for upstream review + +When this ADR is reviewed by `wealdtech/chaind` maintainers (alongside +the no-empty-write fix PR that brings the implementation into +compliance): + +- Confirm the MADR-template style choice from ADR 0001 is acceptable + for ADR 0002 as well. No prior style precedent existed at ADR + 0001's authorship; this ADR follows it. +- Confirm Option 1 is the preferred contract. If the maintainers + prefer the gap-detecting recovery (Option 3) for some reason this + ADR did not anticipate, the contract changes and the fix scope + expands. diff --git a/docs/prometheus.md b/docs/prometheus.md index 9d44d63..b405ec1 100644 --- a/docs/prometheus.md +++ b/docs/prometheus.md @@ -24,6 +24,13 @@ Operations metrics provide information about numbers of operations performed. T - `chaind_finalizer_latest_epoch` latest epoch processed by the finalizer module this run of chaind - `chaind_proposerduties_epochs_processed` number of epochs processed by the proposer duties module this run of chaind - `chaind_proposerduties_latest_epoch` latest epoch processed by the proposer duties module this run of chaind + - `chaind_summarizer_epochs_processed_total` number of epochs processed by the summarizer module this run of chaind + - `chaind_summarizer_latest_epoch` latest epoch processed by the summarizer module this run of chaind + - `chaind_summarizer_days_processed_total` number of days processed by the daily-rollup submodule of the summarizer module this run of chaind + - `chaind_summarizer_latest_day` latest day (Unix timestamp of midnight UTC) processed by the daily-rollup submodule of the summarizer module this run of chaind + - `chaind_summarizer_balance_prune_ts` Unix timestamp of the last validator-balance prune run by the summarizer module + - `chaind_summarizer_epoch_prune_ts` Unix timestamp of the last epoch-summary prune run by the summarizer module + - `chaind_summarizer_lag_epochs{pipeline="epoch|block|validator"}` number of epochs by which each summarizer pipeline lags its direct upstream cursor: `pipeline=epoch` is the gap between `validators.latest_balances_epoch` and the summarizer's epoch cursor (i.e. how far the upstream balances pipeline is ahead of summarization); `pipeline=block` and `pipeline=validator` are the gaps between the summarizer's epoch cursor and its own block / validator sub-pipeline cursors. Values are clamped at zero — disabled pipelines (e.g. `summarizer.blocks.enable=false`) produce no series for that label - `chaind_validators_epochs_processed` number of epochs processed by the validators module this run of chaind - `chaind_validators_latest_epoch` latest epoch processed by the validators module this run of chaind - `chaind_validators_balances_epochs_processed` number of epochs processed by the balances submodule of the validators module this run of chaind diff --git a/go.mod b/go.mod index 747f2d8..9df6534 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/prometheus/client_golang v1.20.5 github.com/prysmaticlabs/go-bitfield v0.0.0-20240618144021-706c95b2dd15 github.com/rs/zerolog v1.33.0 - github.com/sasha-s/go-deadlock v0.3.5 + github.com/sasha-s/go-deadlock v0.3.9 github.com/shopspring/decimal v1.4.0 github.com/spf13/pflag v1.0.6 github.com/spf13/viper v1.19.0 @@ -64,6 +64,7 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.9 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/magiconair/properties v1.8.9 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -71,7 +72,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect - github.com/petermattis/goid v0.0.0-20250121172306-05bcfb9a85dc // indirect + github.com/petermattis/goid v0.0.0-20250813065127-a731cc31b4fe // indirect github.com/pk910/dynamic-ssz v0.0.5 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/go.sum b/go.sum index 8336c0c..0e7c8aa 100644 --- a/go.sum +++ b/go.sum @@ -317,9 +317,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= -github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= -github.com/petermattis/goid v0.0.0-20250121172306-05bcfb9a85dc h1:Xz/LkK9AJRY5QTkA1uE1faB8yeqRFjeKgwDtI13ogcY= -github.com/petermattis/goid v0.0.0-20250121172306-05bcfb9a85dc/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= +github.com/petermattis/goid v0.0.0-20250813065127-a731cc31b4fe h1:vHpqOnPlnkba8iSxU4j/CvDSS9J4+F4473esQsYLGoE= +github.com/petermattis/goid v0.0.0-20250813065127-a731cc31b4fe/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= github.com/pk910/dynamic-ssz v0.0.5 h1:VP9heGYUwzlpyhk28P2nCAzhvGsePJOOOO5vQMDh2qQ= github.com/pk910/dynamic-ssz v0.0.5/go.mod h1:b6CrLaB2X7pYA+OSEEbkgXDEcRnjLOZIxZTsMuO/Y9c= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -355,8 +354,8 @@ github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsF github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= -github.com/sasha-s/go-deadlock v0.3.5 h1:tNCOEEDG6tBqrNDOX35j/7hL5FcFViG6awUGROb2NsU= -github.com/sasha-s/go-deadlock v0.3.5/go.mod h1:bugP6EGbdGYObIlx7pUZtWqlvo8k9H6vCBBsiChJQ5U= +github.com/sasha-s/go-deadlock v0.3.9 h1:fiaT9rB7g5sr5ddNZvlwheclN9IP86eFW9WgqlEQV+w= +github.com/sasha-s/go-deadlock v0.3.9/go.mod h1:KuZj51ZFmx42q/mPaYbRk0P1xcwe697zsJKE03vD4/Y= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= diff --git a/services/beaconcommittees/standard/parameters.go b/services/beaconcommittees/standard/parameters.go index 1ed5a57..2b36afb 100644 --- a/services/beaconcommittees/standard/parameters.go +++ b/services/beaconcommittees/standard/parameters.go @@ -102,11 +102,11 @@ func parseAndCheckParameters(params ...Parameter) (*parameters, error) { } // Ensure the eth2client can handle our requirements. if _, isProvider := parameters.eth2Client.(eth2client.BeaconCommitteesProvider); !isProvider { - //nolint:stylecheck + //nolint:staticcheck return nil, errors.New("Ethereum 2 client does not provide beacon committee information") // skipcq: SCC-ST1005 } if _, isProvider := parameters.eth2Client.(eth2client.EventsProvider); !isProvider { - //nolint:stylecheck + //nolint:staticcheck return nil, errors.New("Ethereum 2 client does not provide events") // skipcq: SCC-ST1005 } if parameters.chainDB == nil { diff --git a/services/chaindb/postgresql/aggregatevalidatorbalances.go b/services/chaindb/postgresql/aggregatevalidatorbalances.go index 3bee86a..46c7932 100644 --- a/services/chaindb/postgresql/aggregatevalidatorbalances.go +++ b/services/chaindb/postgresql/aggregatevalidatorbalances.go @@ -16,7 +16,7 @@ package postgresql import ( "context" "fmt" - "sort" + "slices" "strings" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -215,9 +215,7 @@ func (s *Service) AggregateValidatorBalancesByIndexAndEpochs( // This allows us to form a query that is significantly faster than the simple IN() style. func fastIndices(validatorIndices []phase0.ValidatorIndex) string { // Sort the validator indices. - sort.Slice(validatorIndices, func(i, j int) bool { - return validatorIndices[i] < validatorIndices[j] - }) + slices.Sort(validatorIndices) // Create an array for the validator indices. This gives us higher performance for our query. indices := make([]string, len(validatorIndices)) diff --git a/services/chaindb/postgresql/attestations.go b/services/chaindb/postgresql/attestations.go index eb5e222..e6c9337 100644 --- a/services/chaindb/postgresql/attestations.go +++ b/services/chaindb/postgresql/attestations.go @@ -717,8 +717,8 @@ ORDER BY f_inclusion_slot DESC,f_inclusion_index DESC`) if filter.Limit > 0 { queryVals = append(queryVals, filter.Limit) - queryBuilder.WriteString(fmt.Sprintf(` -LIMIT $%d`, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +LIMIT $%d`, len(queryVals)) } if e := log.Trace(); e.Enabled() { diff --git a/services/chaindb/postgresql/beaconcommittees.go b/services/chaindb/postgresql/beaconcommittees.go index 4573898..558e097 100644 --- a/services/chaindb/postgresql/beaconcommittees.go +++ b/services/chaindb/postgresql/beaconcommittees.go @@ -88,22 +88,22 @@ FROM t_beacon_committees`) if filter.From != nil { queryVals = append(queryVals, *filter.From) - queryBuilder.WriteString(fmt.Sprintf(` -%s f_slot >= $%d`, wherestr, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +%s f_slot >= $%d`, wherestr, len(queryVals)) wherestr = " AND" } if filter.To != nil { queryVals = append(queryVals, *filter.To) - queryBuilder.WriteString(fmt.Sprintf(` -%s f_slot <= $%d`, wherestr, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +%s f_slot <= $%d`, wherestr, len(queryVals)) wherestr = " AND" } if len(filter.CommitteeIndices) > 0 { queryVals = append(queryVals, filter.CommitteeIndices) - queryBuilder.WriteString(fmt.Sprintf(` -%s f_index = ANY($%d)`, wherestr, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +%s f_index = ANY($%d)`, wherestr, len(queryVals)) } switch filter.Order { @@ -119,8 +119,8 @@ ORDER BY f_slot DESC,f_committee DESC`) if filter.Limit > 0 { queryVals = append(queryVals, filter.Limit) - queryBuilder.WriteString(fmt.Sprintf(` -LIMIT $%d`, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +LIMIT $%d`, len(queryVals)) } if e := log.Trace(); e.Enabled() { diff --git a/services/chaindb/postgresql/blobsidecars.go b/services/chaindb/postgresql/blobsidecars.go index c64f49b..92f1bdd 100644 --- a/services/chaindb/postgresql/blobsidecars.go +++ b/services/chaindb/postgresql/blobsidecars.go @@ -46,7 +46,7 @@ func (s *Service) BlobSidecars(ctx context.Context, // Build the query. queryBuilder := strings.Builder{} - queryVals := make([]interface{}, 0) + queryVals := make([]any, 0) queryBuilder.WriteString(` SELECT f_block_root diff --git a/services/chaindb/postgresql/blocks.go b/services/chaindb/postgresql/blocks.go index 3927b95..a3d3fa9 100644 --- a/services/chaindb/postgresql/blocks.go +++ b/services/chaindb/postgresql/blocks.go @@ -151,21 +151,21 @@ FROM t_blocks`) if filter.From != nil { queryVals = append(queryVals, *filter.From) - queryBuilder.WriteString(fmt.Sprintf(` -%s f_slot >= $%d`, wherestr, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +%s f_slot >= $%d`, wherestr, len(queryVals)) wherestr = " AND" } if filter.To != nil { queryVals = append(queryVals, *filter.To) - queryBuilder.WriteString(fmt.Sprintf(` -%s f_slot <= $%d`, wherestr, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +%s f_slot <= $%d`, wherestr, len(queryVals)) } if filter.Canonical != nil { queryVals = append(queryVals, *filter.Canonical) - queryBuilder.WriteString(fmt.Sprintf(` -%s f_canonical = $%d`, wherestr, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +%s f_canonical = $%d`, wherestr, len(queryVals)) } switch filter.Order { @@ -181,8 +181,8 @@ ORDER BY f_slot DESC,f_root DESC`) if filter.Limit > 0 { queryVals = append(queryVals, filter.Limit) - queryBuilder.WriteString(fmt.Sprintf(` -LIMIT $%d`, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +LIMIT $%d`, len(queryVals)) } if e := log.Trace(); e.Enabled() { @@ -892,9 +892,6 @@ func (s *Service) LatestBlocks(ctx context.Context) ([]*chaindb.Block, error) { copy(block.BlobKZGCommitments[i][:], blobKZGCommitments[i]) } } - if err != nil { - return nil, err - } blocks = append(blocks, block) } diff --git a/services/chaindb/postgresql/chainspec.go b/services/chaindb/postgresql/chainspec.go index bf1bd8f..af11150 100644 --- a/services/chaindb/postgresql/chainspec.go +++ b/services/chaindb/postgresql/chainspec.go @@ -160,8 +160,8 @@ func dbValToSpec(_ context.Context, key string, val string) any { } // Handle hex strings. - if strings.HasPrefix(val, "0x") { - byteVal, err := hex.DecodeString(strings.TrimPrefix(val, "0x")) + if after, ok := strings.CutPrefix(val, "0x"); ok { + byteVal, err := hex.DecodeString(after) if err == nil { return byteVal } diff --git a/services/chaindb/postgresql/proposerduties.go b/services/chaindb/postgresql/proposerduties.go index 755bfeb..8c90e69 100644 --- a/services/chaindb/postgresql/proposerduties.go +++ b/services/chaindb/postgresql/proposerduties.go @@ -166,7 +166,7 @@ func (s *Service) ProposerDuties(ctx context.Context, filter *chaindb.ProposerDu // Build the query. queryBuilder := strings.Builder{} - queryVals := make([]interface{}, 0) + queryVals := make([]any, 0) queryBuilder.WriteString(` SELECT f_slot diff --git a/services/chaindb/postgresql/setblobsidecars.go b/services/chaindb/postgresql/setblobsidecars.go index 9f0c92d..06449dd 100644 --- a/services/chaindb/postgresql/setblobsidecars.go +++ b/services/chaindb/postgresql/setblobsidecars.go @@ -50,7 +50,7 @@ func (s *Service) SetBlobSidecars(ctx context.Context, blobSidecars []*chaindb.B "f_kzg_proof", "f_kzg_commitment_inclusion_proof", }, - pgx.CopyFromSlice(len(blobSidecars), func(i int) ([]interface{}, error) { + pgx.CopyFromSlice(len(blobSidecars), func(i int) ([]any, error) { var blob *[]byte if len(blobSidecars[i].Blob) > 0 { blobBytes := blobSidecars[i].Blob[:] @@ -63,7 +63,7 @@ func (s *Service) SetBlobSidecars(ctx context.Context, blobSidecars []*chaindb.B kzgCommitmentInclusionProof = append(kzgCommitmentInclusionProof, blobSidecars[i].KZGCommitmentInclusionProof[j][:]...) } - return []interface{}{ + return []any{ blobSidecars[i].InclusionBlockRoot[:], blobSidecars[i].InclusionSlot, blobSidecars[i].InclusionIndex, diff --git a/services/chaindb/postgresql/setconsolidationrequests.go b/services/chaindb/postgresql/setconsolidationrequests.go index cafea0d..42cf869 100644 --- a/services/chaindb/postgresql/setconsolidationrequests.go +++ b/services/chaindb/postgresql/setconsolidationrequests.go @@ -48,8 +48,8 @@ func (s *Service) SetConsolidationRequests(ctx context.Context, requests []*chai "f_source_pubkey", "f_target_pubkey", }, - pgx.CopyFromSlice(len(requests), func(i int) ([]interface{}, error) { - return []interface{}{ + pgx.CopyFromSlice(len(requests), func(i int) ([]any, error) { + return []any{ requests[i].InclusionBlockRoot[:], requests[i].InclusionSlot, requests[i].InclusionIndex, diff --git a/services/chaindb/postgresql/setdepositrequests.go b/services/chaindb/postgresql/setdepositrequests.go index 67a68a3..7fb7d1f 100644 --- a/services/chaindb/postgresql/setdepositrequests.go +++ b/services/chaindb/postgresql/setdepositrequests.go @@ -50,8 +50,8 @@ func (s *Service) SetDepositRequests(ctx context.Context, requests []*chaindb.De "f_signature", "f_deposit_index", }, - pgx.CopyFromSlice(len(requests), func(i int) ([]interface{}, error) { - return []interface{}{ + pgx.CopyFromSlice(len(requests), func(i int) ([]any, error) { + return []any{ requests[i].InclusionBlockRoot[:], requests[i].InclusionSlot, requests[i].InclusionIndex, diff --git a/services/chaindb/postgresql/setwithdrawalrequests.go b/services/chaindb/postgresql/setwithdrawalrequests.go index 001e900..31d9dc6 100644 --- a/services/chaindb/postgresql/setwithdrawalrequests.go +++ b/services/chaindb/postgresql/setwithdrawalrequests.go @@ -48,8 +48,8 @@ func (s *Service) SetWithdrawalRequests(ctx context.Context, requests []*chaindb "f_validator_pubkey", "f_amount", }, - pgx.CopyFromSlice(len(requests), func(i int) ([]interface{}, error) { - return []interface{}{ + pgx.CopyFromSlice(len(requests), func(i int) ([]any, error) { + return []any{ requests[i].InclusionBlockRoot[:], requests[i].InclusionSlot, requests[i].InclusionIndex, diff --git a/services/chaindb/postgresql/validatorepochsummaries.go b/services/chaindb/postgresql/validatorepochsummaries.go index 02c19ba..a3de808 100644 --- a/services/chaindb/postgresql/validatorepochsummaries.go +++ b/services/chaindb/postgresql/validatorepochsummaries.go @@ -218,21 +218,21 @@ FROM t_validator_epoch_summaries`) if filter.From != nil { queryVals = append(queryVals, *filter.From) - queryBuilder.WriteString(fmt.Sprintf(` -%s f_epoch >= $%d`, wherestr, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +%s f_epoch >= $%d`, wherestr, len(queryVals)) wherestr = " AND" } if filter.To != nil { queryVals = append(queryVals, *filter.To) - queryBuilder.WriteString(fmt.Sprintf(` -%s f_epoch <= $%d`, wherestr, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +%s f_epoch <= $%d`, wherestr, len(queryVals)) } if filter.ValidatorIndices != nil && len(*filter.ValidatorIndices) > 0 { queryVals = append(queryVals, *filter.ValidatorIndices) - queryBuilder.WriteString(fmt.Sprintf(` -%s f_validator_index = ANY($%d)`, wherestr, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +%s f_validator_index = ANY($%d)`, wherestr, len(queryVals)) } switch filter.Order { @@ -248,8 +248,8 @@ ORDER BY f_epoch DESC,f_validator_index DESC`) if filter.Limit > 0 { queryVals = append(queryVals, filter.Limit) - queryBuilder.WriteString(fmt.Sprintf(` -LIMIT $%d`, len(queryVals))) + fmt.Fprintf(&queryBuilder, ` +LIMIT $%d`, len(queryVals)) } if e := log.Trace(); e.Enabled() { @@ -517,7 +517,7 @@ WHERE f_validator_index = $1 return summary, nil } -// PruneValidatorEpochSummaries prunes validator epoch summaries up to (but not including) the given point. +// PruneValidatorEpochSummaries prunes validator epoch summaries up to and including the given epoch. func (s *Service) PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error { ctx, span := otel.Tracer("wealdtech.chaind.services.chaindb.postgresql").Start(ctx, "PruneValidatorEpochSummaries") defer span.End() diff --git a/services/chaindb/postgresql/validators.go b/services/chaindb/postgresql/validators.go index fa55980..d0efe2e 100644 --- a/services/chaindb/postgresql/validators.go +++ b/services/chaindb/postgresql/validators.go @@ -17,7 +17,7 @@ import ( "context" "database/sql" "fmt" - "sort" + "slices" "strings" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -520,9 +520,7 @@ func (s *Service) ValidatorBalancesByIndexAndEpochRange( } // Sort the validator indices. - sort.Slice(validatorIndices, func(i, j int) bool { - return validatorIndices[i] < validatorIndices[j] - }) + slices.Sort(validatorIndices) // Create a matrix of the values we require. This allows the database to fill in the blanks when it doesn't have a balance for // the required (index,epoch) tuple (for example when the balance is 0). @@ -595,9 +593,7 @@ func (s *Service) ValidatorBalancesByIndexAndEpochs( } // Sort the validator indices. - sort.Slice(validatorIndices, func(i, j int) bool { - return validatorIndices[i] < validatorIndices[j] - }) + slices.Sort(validatorIndices) // Create a matrix of the values we require. This allows the database to fill in the blanks when it doesn't have a balance for // the required (index,epoch) tuple (for example when the balance is 0). @@ -707,7 +703,7 @@ func validatorBalanceFromRow(rows pgx.Rows) (*chaindb.ValidatorBalance, error) { return validatorBalance, nil } -// PruneValidatorBalances prunes validator balances up to (but not including) the given epoch. +// PruneValidatorBalances prunes validator balances up to and including the given epoch. func (s *Service) PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error { ctx, span := otel.Tracer("wealdtech.chaind.services.chaindb.postgresql").Start(ctx, "PruneValidatorBalances") defer span.End() diff --git a/services/chaindb/service.go b/services/chaindb/service.go index b013438..1c951ee 100644 --- a/services/chaindb/service.go +++ b/services/chaindb/service.go @@ -384,7 +384,7 @@ type AggregateValidatorBalancesProvider interface { // ValidatorBalancesPruner defines functions to prune validator balances. type ValidatorBalancesPruner interface { - // PruneValidatorBalances prunes validator balances up to (but not including) the given epoch. + // PruneValidatorBalances prunes validator balances up to and including the given epoch. PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error } @@ -451,7 +451,7 @@ type ValidatorEpochSummariesProvider interface { // ValidatorEpochSummariesPruner defines functions to prune validator epoch summaries. type ValidatorEpochSummariesPruner interface { - // PruneValidatorEpochSummaries prunes validator epoch summaries up to (but not including) the given point. + // PruneValidatorEpochSummaries prunes validator epoch summaries up to and including the given epoch. PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error } diff --git a/services/chaintime/standard/service.go b/services/chaintime/standard/service.go index 96911b8..463e87c 100644 --- a/services/chaintime/standard/service.go +++ b/services/chaintime/standard/service.go @@ -228,10 +228,7 @@ func (s *Service) TimestampToEpoch(timestamp time.Time) phase0.Epoch { // FirstEpochOfSyncPeriod provides the first epoch of the given sync period. // Note that epochs before the sync committee period will provide the Altair hard fork epoch. func (s *Service) FirstEpochOfSyncPeriod(period uint64) phase0.Epoch { - epoch := phase0.Epoch(period * s.epochsPerSyncCommitteePeriod) - if epoch < s.altairForkEpoch { - epoch = s.altairForkEpoch - } + epoch := max(phase0.Epoch(period*s.epochsPerSyncCommitteePeriod), s.altairForkEpoch) return epoch } diff --git a/services/eth1deposits/getlogs/service.go b/services/eth1deposits/getlogs/service.go index 3ac565b..810a7ec 100644 --- a/services/eth1deposits/getlogs/service.go +++ b/services/eth1deposits/getlogs/service.go @@ -218,10 +218,7 @@ func (s *Service) parseNewBlocks(ctx context.Context, md *metadata) { log.Trace().Uint64("start_block", md.LatestBlock+1).Uint64("end_block", latestHeadBlock).Msg("Fetching ETH1 logs in batches") for block := md.LatestBlock + 1; block <= latestHeadBlock; block += s.blocksPerRequest { startBlock := block - endBlock := block + s.blocksPerRequest - 1 - if endBlock > latestHeadBlock { - endBlock = latestHeadBlock - } + endBlock := min(block+s.blocksPerRequest-1, latestHeadBlock) log := log.With().Uint64("start_block", startBlock).Uint64("end_block", endBlock).Logger() // Each update goes in to its own transaction, to make the data available sooner. diff --git a/services/finalizer/standard/metadata.go b/services/finalizer/standard/metadata.go index 548fccb..6185180 100644 --- a/services/finalizer/standard/metadata.go +++ b/services/finalizer/standard/metadata.go @@ -22,9 +22,12 @@ import ( // metadata stored about this service. type metadata struct { - LastFinalizedEpoch int64 `json:"latest_epoch"` - LatestCanonicalSlot int64 `json:"latest_canonical_slot"` - MissedEpochs []int64 `json:"missed_epochs,omitempty"` + LastFinalizedEpoch int64 `json:"latest_epoch"` + LatestCanonicalSlot int64 `json:"latest_canonical_slot"` + // Deprecated: never populated. Residue from an abandoned gap-tracking + // design. Retained for JSON backward-compatibility with t_metadata rows + // persisted by older builds. + MissedEpochs []int64 `json:"missed_epochs,omitempty"` } // metadataKey is the key for the metadata. diff --git a/services/proposerduties/standard/metadata.go b/services/proposerduties/standard/metadata.go index 16b6b1d..fc3827b 100644 --- a/services/proposerduties/standard/metadata.go +++ b/services/proposerduties/standard/metadata.go @@ -23,7 +23,10 @@ import ( // metadata stored about this service. type metadata struct { - LatestEpoch int64 `json:"latest_epoch"` + LatestEpoch int64 `json:"latest_epoch"` + // Deprecated: never populated. The handleMissed consumer reads from a + // permanently-empty list and is dormant. Retained for JSON backward- + // compatibility with t_metadata rows persisted by older builds. MissedEpochs []phase0.Epoch `json:"missed_epochs,omitempty"` } diff --git a/services/scheduler/standard/service_test.go b/services/scheduler/standard/service_test.go index b9152ff..0434100 100644 --- a/services/scheduler/standard/service_test.go +++ b/services/scheduler/standard/service_test.go @@ -449,7 +449,7 @@ func TestManyJobs(t *testing.T) { runTime := time.Now().Add(200 * time.Millisecond) jobs := 2048 - for i := 0; i < jobs; i++ { + for i := range jobs { require.NoError(t, s.ScheduleJob(ctx, "Test", fmt.Sprintf("Job instance %d", i), runTime, runFunc, nil)) } require.Len(t, s.ListJobs(ctx), jobs) @@ -579,7 +579,7 @@ func TestMulti(t *testing.T) { var runWG sync.WaitGroup var setupWG sync.WaitGroup starter := make(chan any) - for i := 0; i < 32; i++ { + for range 32 { setupWG.Add(1) runWG.Add(1) go func() { diff --git a/services/summarizer/standard/epoch.go b/services/summarizer/standard/epoch.go index 48f15f4..f2c74ea 100644 --- a/services/summarizer/standard/epoch.go +++ b/services/summarizer/standard/epoch.go @@ -16,7 +16,7 @@ package standard import ( "context" "fmt" - "sort" + "slices" "time" "github.com/attestantio/go-eth2-client/spec/electra" @@ -76,8 +76,11 @@ func (s *Service) summarizeEpoch(ctx context.Context, return false, errors.Wrap(err, "failed to obtain validator balances") } if len(balances) == 0 { - // This can happen if chaind does not have validator balances enabled, or has not yet obtained - // the balances. We return false but no error. + // Bootstrap path: t_validators not yet populated, so the LEFT JOIN in + // ValidatorBalancesByEpoch returns no rows. Production state is + // covered by the post-accumulation guard below. The message is the + // alert-rule contract; changing the text breaks the alert. + log.Warn().Msg("No validator balances available; cannot summarize epoch (will retry on next finality tick)") return false, nil } // Make a balances map. @@ -97,6 +100,15 @@ func (s *Service) summarizeEpoch(ctx context.Context, } log.Trace().Dur("elapsed", time.Since(started)).Msg("Obtained validator balances") + // Production-state guard: ValidatorBalancesByEpoch's LEFT JOIN with + // COALESCE(f_balance, 0) returns one zero-balance row per validator when + // no balance rows exist for the epoch, so the len() check above cannot + // catch this shape. Reuse the alert-rule message text. + if summary.ActiveValidators > 0 && summary.ActiveBalance == 0 { + log.Warn().Msg("No validator balances available; cannot summarize epoch (will retry on next finality tick)") + return false, nil + } + err = s.blockStatsForEpoch(ctx, epoch, summary) if err != nil { return false, errors.Wrap(err, "failed to calculate block summary statistics for epoch") @@ -438,8 +450,8 @@ func (s *Service) attesterSlashingStatsForSlotRange(ctx context.Context, // intersection returns a list of items common between the two sets. func intersection(set1 []phase0.ValidatorIndex, set2 []phase0.ValidatorIndex) []phase0.ValidatorIndex { - sort.Slice(set1, func(i, j int) bool { return set1[i] < set1[j] }) - sort.Slice(set2, func(i, j int) bool { return set2[i] < set2[j] }) + slices.Sort(set1) + slices.Sort(set2) res := make([]phase0.ValidatorIndex, 0) set1Pos := 0 diff --git a/services/summarizer/standard/handler.go b/services/summarizer/standard/handler.go index fc7f48f..11fc6ec 100644 --- a/services/summarizer/standard/handler.go +++ b/services/summarizer/standard/handler.go @@ -47,6 +47,9 @@ func (s *Service) OnFinalityUpdated( return } defer s.activitySem.Release(1) + // Registered after activitySem.Release so LIFO ordering runs the gauge + // update while the semaphore is still held. + defer s.updateLagGauges(ctx, finalizedEpoch) if finalizedEpoch == 0 { log.Debug().Msg("Not summarizing on epoch 0") @@ -130,7 +133,8 @@ func (s *Service) summarizeEpochs(ctx context.Context, targetEpoch phase0.Epoch) return errors.Wrapf(err, "failed to update summary for epoch %d", epoch) } if !updated { - log.Debug().Uint64("epoch", uint64(epoch)).Msg("Not enough data to update summary") + // Alert-rule contract — do not change message text. + log.Warn().Uint64("epoch", uint64(epoch)).Msg("Not enough data to update summary; will retry on next finality tick") return nil } } @@ -270,3 +274,49 @@ func (s *Service) summarizeValidatorDays(ctx context.Context) error { func (s *Service) epochsPerDay() phase0.Epoch { return phase0.Epoch(86400.0 / s.chainTime.SlotDuration().Seconds() / float64(s.chainTime.SlotsPerEpoch())) } + +// updateLagGauges refreshes the per-pipeline lag gauges from current +// metadata. The pure-compute half is split into setLagGauges for testing. +func (s *Service) updateLagGauges(ctx context.Context, finalizedEpoch phase0.Epoch) { + if finalizedEpoch == 0 { + return + } + md, err := s.getMetadata(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to obtain metadata for lag gauges") + return + } + upstream, err := s.getUpstreamMetadata(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to obtain upstream metadata for lag gauges") + return + } + s.setLagGauges(md, upstream) +} + +// setLagGauges sets per-pipeline lag as a cursor diff against direct upstream: +// +// - epoch = validators.LatestBalancesEpoch - summarizer.LastEpoch +// - block = summarizer.LastEpoch - summarizer.LastBlockEpoch +// - validator = summarizer.LastEpoch - summarizer.LastValidatorEpoch +func (s *Service) setLagGauges(md *metadata, upstream *upstreamMetadata) { + if s.epochSummaries { + monitorLag("epoch", clampLag(upstream.LatestBalancesEpoch, md.LastEpoch)) + } + if s.blockSummaries { + monitorLag("block", clampLag(md.LastEpoch, md.LastBlockEpoch)) + } + if s.validatorSummaries { + monitorLag("validator", clampLag(md.LastEpoch, md.LastValidatorEpoch)) + } +} + +// clampLag returns upstream - downstream as a non-negative float64. uint64 +// subtraction would underflow to ~1.8e19 if cursors transiently invert across +// metadata reads, breaking alerts. +func clampLag(upstream, downstream phase0.Epoch) float64 { + if downstream >= upstream { + return 0 + } + return float64(upstream - downstream) +} diff --git a/services/summarizer/standard/handler_internal_test.go b/services/summarizer/standard/handler_internal_test.go new file mode 100644 index 0000000..16ccebc --- /dev/null +++ b/services/summarizer/standard/handler_internal_test.go @@ -0,0 +1,351 @@ +// Copyright © 2021 - 2026 Weald Technology Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standard + +import ( + "bytes" + "context" + "strings" + "testing" + + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "github.com/wealdtech/chaind/services/chaindb" +) + +// Tests in this file mutate package-level state — the unexported `log` variable +// and the `summarizerLagEpochs` gauge — and restore the originals on teardown. +// Do NOT add t.Parallel() to any test here; the global mutation would race +// across goroutines. + +// stubValidatorsProvider drives summarizeEpoch into the silent-skip and +// zero-balance branches. +type stubValidatorsProvider struct { + validators []*chaindb.Validator + balances []*chaindb.ValidatorBalance +} + +func (s *stubValidatorsProvider) Validators(_ context.Context) ([]*chaindb.Validator, error) { + return s.validators, nil +} + +func (s *stubValidatorsProvider) ValidatorsByPublicKey(_ context.Context, _ []phase0.BLSPubKey) (map[phase0.BLSPubKey]*chaindb.Validator, error) { + return nil, nil +} + +func (s *stubValidatorsProvider) ValidatorsByIndex(_ context.Context, _ []phase0.ValidatorIndex) (map[phase0.ValidatorIndex]*chaindb.Validator, error) { + return nil, nil +} + +func (s *stubValidatorsProvider) ValidatorBalancesByEpoch(_ context.Context, _ phase0.Epoch) ([]*chaindb.ValidatorBalance, error) { + return s.balances, nil +} + +func (s *stubValidatorsProvider) ValidatorBalancesByIndexAndEpoch(_ context.Context, _ []phase0.ValidatorIndex, _ phase0.Epoch) (map[phase0.ValidatorIndex]*chaindb.ValidatorBalance, error) { + return nil, nil +} + +func (s *stubValidatorsProvider) ValidatorBalancesByIndexAndEpochRange(_ context.Context, _ []phase0.ValidatorIndex, _ phase0.Epoch, _ phase0.Epoch) (map[phase0.ValidatorIndex][]*chaindb.ValidatorBalance, error) { + return nil, nil +} + +func (s *stubValidatorsProvider) ValidatorBalancesByIndexAndEpochs(_ context.Context, _ []phase0.ValidatorIndex, _ []phase0.Epoch) (map[phase0.ValidatorIndex][]*chaindb.ValidatorBalance, error) { + return nil, nil +} + +func TestSummarizeEpochSilentSkipEmitsWarn(t *testing.T) { + ctx := context.Background() + + // main_test.go sets GlobalLevel to Disabled; raise it so the buffer captures warns. + originalGlobalLevel := zerolog.GlobalLevel() + zerolog.SetGlobalLevel(zerolog.TraceLevel) + defer zerolog.SetGlobalLevel(originalGlobalLevel) + + var buf bytes.Buffer + originalLog := log + log = zerolog.New(&buf).Level(zerolog.WarnLevel) + defer func() { log = originalLog }() + + const epoch = phase0.Epoch(5) + farFuture := phase0.Epoch(0xffffffffffffffff) + + s := &Service{ + farFutureEpoch: farFuture, + epochSummaries: true, + validatorsProvider: &stubValidatorsProvider{ + validators: []*chaindb.Validator{ + { + Index: 1, + ActivationEpoch: 0, + ExitEpoch: farFuture, + }, + }, + balances: nil, + }, + } + + updated, err := s.summarizeEpoch(ctx, &metadata{}, epoch) + require.NoError(t, err) + require.False(t, updated) + + logged := buf.String() + require.True(t, + strings.Contains(logged, "No validator balances available; cannot summarize epoch"), + "warn log missing expected stable message text; got: %s", logged) + require.True(t, + strings.Contains(logged, `"level":"warn"`), + "silent-skip log emitted at unexpected level; got: %s", logged) +} + +// recordingChainDB records every SetEpochSummary call so tests can assert +// the corrupt-summary guard prevents writes. +type recordingChainDB struct { + summariesSet []*chaindb.EpochSummary +} + +func (c *recordingChainDB) BeginTx(ctx context.Context) (context.Context, context.CancelFunc, error) { + return ctx, func() {}, nil +} + +func (c *recordingChainDB) CommitTx(_ context.Context) error { + return nil +} + +func (c *recordingChainDB) BeginROTx(ctx context.Context) (context.Context, error) { + return ctx, nil +} + +func (c *recordingChainDB) CommitROTx(_ context.Context) {} + +func (c *recordingChainDB) SetMetadata(_ context.Context, _ string, _ []byte) error { + return nil +} + +func (c *recordingChainDB) Metadata(_ context.Context, _ string) ([]byte, error) { + return nil, nil +} + +func (c *recordingChainDB) SetEpochSummary(_ context.Context, summary *chaindb.EpochSummary) error { + c.summariesSet = append(c.summariesSet, summary) + return nil +} + +// TestSummarizeEpochZeroBalanceAssertion drives the production-state shape +// (LEFT JOIN with all-zero rows) and asserts the guard refuses to write. +func TestSummarizeEpochZeroBalanceAssertion(t *testing.T) { + ctx := context.Background() + + originalGlobalLevel := zerolog.GlobalLevel() + zerolog.SetGlobalLevel(zerolog.TraceLevel) + defer zerolog.SetGlobalLevel(originalGlobalLevel) + + var buf bytes.Buffer + originalLog := log + log = zerolog.New(&buf).Level(zerolog.WarnLevel) + defer func() { log = originalLog }() + + const epoch = phase0.Epoch(5) + const numValidators = 1000 + farFuture := phase0.Epoch(0xffffffffffffffff) + + validators := make([]*chaindb.Validator, numValidators) + balances := make([]*chaindb.ValidatorBalance, numValidators) + for i := range numValidators { + idx := phase0.ValidatorIndex(i) + validators[i] = &chaindb.Validator{ + Index: idx, + ActivationEpoch: 0, + ExitEpoch: farFuture, + } + balances[i] = &chaindb.ValidatorBalance{ + Index: idx, + Epoch: epoch, + Balance: 0, + EffectiveBalance: 0, + } + } + + chainDB := &recordingChainDB{} + s := &Service{ + chainDB: chainDB, + farFutureEpoch: farFuture, + epochSummaries: true, + validatorsProvider: &stubValidatorsProvider{ + validators: validators, + balances: balances, + }, + } + + updated, err := s.summarizeEpoch(ctx, &metadata{}, epoch) + require.NoError(t, err) + require.False(t, updated) + require.Empty(t, chainDB.summariesSet, + "corrupt summary written to t_epoch_summaries despite zero-balance guard") + + logged := buf.String() + require.True(t, + strings.Contains(logged, "No validator balances available; cannot summarize epoch"), + "warn log missing expected stable message text; got: %s", logged) + require.True(t, + strings.Contains(logged, `"level":"warn"`), + "zero-balance log emitted at unexpected level; got: %s", logged) +} + +// TestSetLagGaugesWiring asserts per-pipeline cursor-diff math, disabled- +// pipeline absence, and clamp-to-zero behavior. +func TestSetLagGaugesWiring(t *testing.T) { + originalGauge := summarizerLagEpochs + summarizerLagEpochs = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "lag_epochs", + Help: "test", + }, []string{"pipeline"}) + defer func() { summarizerLagEpochs = originalGauge }() + + tests := []struct { + name string + service *Service + md *metadata + upstream *upstreamMetadata + wantSeries int + wantValues map[string]float64 + }{ + { + name: "all caught up", + service: &Service{ + epochSummaries: true, + blockSummaries: true, + validatorSummaries: true, + }, + md: &metadata{ + LastEpoch: 10, + LastBlockEpoch: 10, + LastValidatorEpoch: 10, + }, + upstream: &upstreamMetadata{LatestBalancesEpoch: 10}, + wantSeries: 3, + wantValues: map[string]float64{ + "epoch": 0, + "block": 0, + "validator": 0, + }, + }, + { + name: "upstream ahead of summarizer", + service: &Service{ + epochSummaries: true, + blockSummaries: true, + validatorSummaries: true, + }, + md: &metadata{ + LastEpoch: 7, + LastBlockEpoch: 7, + LastValidatorEpoch: 7, + }, + upstream: &upstreamMetadata{LatestBalancesEpoch: 10}, + wantSeries: 3, + wantValues: map[string]float64{ + "epoch": 3, // 10 - 7 + "block": 0, // summarizer epoch == sub-pipelines + "validator": 0, + }, + }, + { + name: "intra-summarizer lag (block + validator behind epoch)", + service: &Service{ + epochSummaries: true, + blockSummaries: true, + validatorSummaries: true, + }, + md: &metadata{ + LastEpoch: 10, + LastBlockEpoch: 5, + LastValidatorEpoch: 4, + }, + upstream: &upstreamMetadata{LatestBalancesEpoch: 10}, + wantSeries: 3, + wantValues: map[string]float64{ + "epoch": 0, // 10 - 10 + "block": 5, // 10 - 5 + "validator": 6, // 10 - 4 + }, + }, + { + name: "race window — summarizer briefly ahead of upstream", + service: &Service{ + epochSummaries: true, + blockSummaries: true, + validatorSummaries: true, + }, + md: &metadata{ + LastEpoch: 12, + LastBlockEpoch: 12, + LastValidatorEpoch: 12, + }, + upstream: &upstreamMetadata{LatestBalancesEpoch: 10}, + wantSeries: 3, + wantValues: map[string]float64{ + "epoch": 0, // clamped, not 1.8e19 + "block": 0, + "validator": 0, + }, + }, + { + name: "block pipeline disabled", + service: &Service{ + epochSummaries: true, + blockSummaries: false, + validatorSummaries: true, + }, + md: &metadata{ + LastEpoch: 9, + LastValidatorEpoch: 8, + }, + upstream: &upstreamMetadata{LatestBalancesEpoch: 11}, + wantSeries: 2, + wantValues: map[string]float64{ + "epoch": 2, // 11 - 9 + "validator": 1, // 9 - 8 + }, + }, + { + name: "no pipelines enabled", + service: &Service{}, + md: &metadata{}, + upstream: &upstreamMetadata{}, + wantSeries: 0, + wantValues: map[string]float64{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + summarizerLagEpochs.Reset() + + test.service.setLagGauges(test.md, test.upstream) + + require.Equal(t, test.wantSeries, testutil.CollectAndCount(summarizerLagEpochs), + "unexpected number of label series emitted") + + for label, want := range test.wantValues { + got := testutil.ToFloat64(summarizerLagEpochs.WithLabelValues(label)) + require.Equal(t, want, got, "unexpected value for pipeline=%s", label) + require.GreaterOrEqual(t, got, 0.0, "lag for pipeline=%s should be non-negative", label) + } + }) + } +} diff --git a/services/summarizer/standard/main_test.go b/services/summarizer/standard/main_test.go index 6778a57..5560313 100644 --- a/services/summarizer/standard/main_test.go +++ b/services/summarizer/standard/main_test.go @@ -14,6 +14,7 @@ package standard_test import ( + "flag" "os" "testing" @@ -22,8 +23,13 @@ import ( func TestMain(m *testing.M) { zerolog.SetGlobalLevel(zerolog.Disabled) - if os.Getenv("CHAINDB_URL") != "" && - os.Getenv("ETH2CLIENT_ADDRESS") != "" { - os.Exit(m.Run()) + // `package standard` (internal) and `package standard_test` (external) tests + // share a single test binary, so this TestMain governs both. When the env + // vars required by service_test.go's TestService are absent we skip that + // test by name and still call m.Run(), so internal-package unit tests like + // the lag-gauge wiring tests run unconditionally under `go test ./...`. + if os.Getenv("CHAINDB_URL") == "" || os.Getenv("ETH2CLIENT_ADDRESS") == "" { + _ = flag.Set("test.skip", "^TestService$") } + os.Exit(m.Run()) } diff --git a/services/summarizer/standard/metadata.go b/services/summarizer/standard/metadata.go index bdef89a..3bcfaa0 100644 --- a/services/summarizer/standard/metadata.go +++ b/services/summarizer/standard/metadata.go @@ -82,3 +82,40 @@ type oldmetadata struct { LastValidatorDay int64 `json:"last_validator_day"` PeriodicValidatorRollups bool `json:"periodic_validator_rollups"` } + +// upstreamMetadataKey mirrors validators.standard's metadata key to keep the +// summarizer decoupled from the validators package. +const upstreamMetadataKey = "validators.standard" + +// upstreamMetadata captures the subset of validators.standard metadata the +// lag gauge consumes. +type upstreamMetadata struct { + LatestBalancesEpoch phase0.Epoch `json:"latest_balances_epoch"` +} + +// oldUpstreamMetadata is the pre-0.8.8 unquoted-int format. +type oldUpstreamMetadata struct { + LatestBalancesEpoch uint64 `json:"latest_balances_epoch"` +} + +// getUpstreamMetadata reads validators.standard metadata. Returns a zero +// struct when the row is absent (bootstrap state). +func (s *Service) getUpstreamMetadata(ctx context.Context) (*upstreamMetadata, error) { + md := &upstreamMetadata{} + mdJSON, err := s.chainDB.Metadata(ctx, upstreamMetadataKey) + if err != nil { + return nil, errors.Wrap(err, "failed to fetch upstream validators metadata") + } + if mdJSON == nil { + return md, nil + } + if err := json.Unmarshal(mdJSON, md); err != nil { + // Try the old format. + omd := &oldUpstreamMetadata{} + if err := json.Unmarshal(mdJSON, omd); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal upstream validators metadata") + } + md.LatestBalancesEpoch = phase0.Epoch(omd.LatestBalancesEpoch) + } + return md, nil +} diff --git a/services/summarizer/standard/metrics.go b/services/summarizer/standard/metrics.go index f6efe37..17b69ca 100644 --- a/services/summarizer/standard/metrics.go +++ b/services/summarizer/standard/metrics.go @@ -41,6 +41,11 @@ var ( lastBalancePrune prometheus.Gauge ) +// summarizerLagEpochs records how many epochs each pipeline trails its +// upstream cursor. Disabled pipelines never call WithLabelValues, so their +// series are absent from /metrics. Alert-rule contract — do not rename. +var summarizerLagEpochs *prometheus.GaugeVec + func registerMetrics(_ context.Context, monitor metrics.Service) error { if latestEpoch != nil { // Already registered. @@ -112,9 +117,27 @@ func registerPrometheusMetrics() error { return errors.Wrap(err, "failed to register epoch_prune_ts") } + summarizerLagEpochs = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "lag_epochs", + Help: "Number of epochs by which each summarizer pipeline lags its direct upstream cursor", + }, []string{"pipeline"}) + if err := prometheus.Register(summarizerLagEpochs); err != nil { + return errors.Wrap(err, "failed to register lag_epochs") + } + return nil } +// monitorLag records the lag in epochs for a given summarizer pipeline. +// Callers are responsible for guarding with the per-pipeline enabled flag, so +// disabled pipelines never produce a series. +func monitorLag(pipeline string, lag float64) { + if summarizerLagEpochs != nil { + summarizerLagEpochs.WithLabelValues(pipeline).Set(lag) + } +} + // monitorLatestEpoch sets the latest epoch without registering an // increase in epochs processed. This does not usually need to be // called directly, as it is called as part of monitorEpochProcessed. diff --git a/services/summarizer/standard/validatorday.go b/services/summarizer/standard/validatorday.go index 4e792be..ae4db6d 100644 --- a/services/summarizer/standard/validatorday.go +++ b/services/summarizer/standard/validatorday.go @@ -225,8 +225,11 @@ func (s *Service) addValidatorBalanceSummaries(ctx context.Context, if err != nil { return false, errors.Wrap(err, "failed to obtain validator start epoch balances") } - if len(startBalances) == 0 { - // Balances are not yet present. + if !validatorBalancesPresent(startBalances) { + // Same dual-shape guard as epoch.go: bootstrap empty slice or LEFT + // JOIN all-zero rows. Keep the "epoch" wording verbatim for the + // alert rule; start_time/end_time disambiguate the day layer. + log.Warn().Time("start_time", startTime).Time("end_time", endTime).Msg("No validator balances available; cannot summarize epoch (will retry on next finality tick)") return false, nil } for _, startBalance := range startBalances { @@ -281,8 +284,9 @@ func (s *Service) addValidatorBalanceSummaries(ctx context.Context, if err != nil { return false, errors.Wrap(err, "failed to obtain validator end epoch balances") } - if len(endBalances) == 0 { - // Balances are not yet present. + if !validatorBalancesPresent(endBalances) { + // Same dual-shape guard as startBalances above. + log.Warn().Time("start_time", startTime).Time("end_time", endTime).Msg("No validator balances available; cannot summarize epoch (will retry on next finality tick)") return false, nil } for _, endBalance := range endBalances { @@ -418,3 +422,15 @@ func (s *Service) syncCommitteesForEpochs(ctx context.Context, return syncCommittees, nil } + +// validatorBalancesPresent reports whether the slice contains at least one +// non-zero row. An empty slice and a LEFT-JOIN-all-zero slice both report +// false, collapsing the bootstrap and production-state shapes into one guard. +func validatorBalancesPresent(balances []*chaindb.ValidatorBalance) bool { + for _, b := range balances { + if b.Balance != 0 || b.EffectiveBalance != 0 { + return true + } + } + return false +} diff --git a/services/summarizer/standard/validatorday_internal_test.go b/services/summarizer/standard/validatorday_internal_test.go new file mode 100644 index 0000000..5504eca --- /dev/null +++ b/services/summarizer/standard/validatorday_internal_test.go @@ -0,0 +1,210 @@ +// Copyright © 2026 Weald Technology Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standard + +import ( + "bytes" + "context" + "strings" + "testing" + "time" + + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "github.com/wealdtech/chaind/services/chaindb" + mockchaintime "github.com/wealdtech/chaind/services/chaintime/mock" +) + +// Tests in this file mutate the package-level `log` variable and restore it on +// teardown. Do NOT add t.Parallel() to any test here — the global mutation +// would race across goroutines. + +// stubBalanceChainDB pops balance responses from a queue so one fixture can +// drive both the start-balances and end-balances guard paths. daySummariesSet +// records writes so tests can assert the guard short-circuits. +type stubBalanceChainDB struct { + balancesResponses [][]*chaindb.ValidatorBalance + balancesCallCount int + daySummariesSet [][]*chaindb.ValidatorDaySummary +} + +// chaindb.Service. +func (c *stubBalanceChainDB) BeginTx(ctx context.Context) (context.Context, context.CancelFunc, error) { + return ctx, func() {}, nil +} + +func (c *stubBalanceChainDB) CommitTx(_ context.Context) error { return nil } +func (c *stubBalanceChainDB) BeginROTx(ctx context.Context) (context.Context, error) { return ctx, nil } +func (c *stubBalanceChainDB) CommitROTx(_ context.Context) {} +func (c *stubBalanceChainDB) SetMetadata(_ context.Context, _ string, _ []byte) error { return nil } +func (c *stubBalanceChainDB) Metadata(_ context.Context, _ string) ([]byte, error) { return nil, nil } + +// chaindb.ValidatorsProvider. +func (c *stubBalanceChainDB) Validators(_ context.Context) ([]*chaindb.Validator, error) { + return nil, nil +} + +func (c *stubBalanceChainDB) ValidatorsByPublicKey(_ context.Context, _ []phase0.BLSPubKey) (map[phase0.BLSPubKey]*chaindb.Validator, error) { + return nil, nil +} + +func (c *stubBalanceChainDB) ValidatorsByIndex(_ context.Context, _ []phase0.ValidatorIndex) (map[phase0.ValidatorIndex]*chaindb.Validator, error) { + return nil, nil +} + +func (c *stubBalanceChainDB) ValidatorBalancesByEpoch(_ context.Context, _ phase0.Epoch) ([]*chaindb.ValidatorBalance, error) { + if c.balancesCallCount >= len(c.balancesResponses) { + return nil, nil + } + resp := c.balancesResponses[c.balancesCallCount] + c.balancesCallCount++ + return resp, nil +} + +func (c *stubBalanceChainDB) ValidatorBalancesByIndexAndEpoch(_ context.Context, _ []phase0.ValidatorIndex, _ phase0.Epoch) (map[phase0.ValidatorIndex]*chaindb.ValidatorBalance, error) { + return nil, nil +} + +func (c *stubBalanceChainDB) ValidatorBalancesByIndexAndEpochRange(_ context.Context, _ []phase0.ValidatorIndex, _ phase0.Epoch, _ phase0.Epoch) (map[phase0.ValidatorIndex][]*chaindb.ValidatorBalance, error) { + return nil, nil +} + +func (c *stubBalanceChainDB) ValidatorBalancesByIndexAndEpochs(_ context.Context, _ []phase0.ValidatorIndex, _ []phase0.Epoch) (map[phase0.ValidatorIndex][]*chaindb.ValidatorBalance, error) { + return nil, nil +} + +// chaindb.DepositsProvider. +func (c *stubBalanceChainDB) DepositsByPublicKey(_ context.Context, _ []phase0.BLSPubKey) (map[phase0.BLSPubKey][]*chaindb.Deposit, error) { + return nil, nil +} + +func (c *stubBalanceChainDB) DepositsForSlotRange(_ context.Context, _ phase0.Slot, _ phase0.Slot) ([]*chaindb.Deposit, error) { + return nil, nil +} + +// chaindb.WithdrawalsProvider. +func (c *stubBalanceChainDB) Withdrawals(_ context.Context, _ *chaindb.WithdrawalFilter) ([]*chaindb.Withdrawal, error) { + return nil, nil +} + +// chaindb.ValidatorDaySummariesSetter. +func (c *stubBalanceChainDB) SetValidatorDaySummary(_ context.Context, _ *chaindb.ValidatorDaySummary) error { + return nil +} + +func (c *stubBalanceChainDB) SetValidatorDaySummaries(_ context.Context, summaries []*chaindb.ValidatorDaySummary) error { + c.daySummariesSet = append(c.daySummariesSet, summaries) + return nil +} + +// TestAddValidatorBalanceSummariesZeroBalanceAssertion drives the LEFT-JOIN- +// all-zero shape into both startBalances and endBalances guard sites and +// asserts neither writes to t_validator_day_summaries. +func TestAddValidatorBalanceSummariesZeroBalanceAssertion(t *testing.T) { + const numValidators = 1000 + + makeZeroBalances := func() []*chaindb.ValidatorBalance { + balances := make([]*chaindb.ValidatorBalance, numValidators) + for i := range numValidators { + balances[i] = &chaindb.ValidatorBalance{ + Index: phase0.ValidatorIndex(i), + Balance: 0, + EffectiveBalance: 0, + } + } + return balances + } + + makeNonZeroBalances := func() []*chaindb.ValidatorBalance { + balances := make([]*chaindb.ValidatorBalance, numValidators) + for i := range numValidators { + balances[i] = &chaindb.ValidatorBalance{ + Index: phase0.ValidatorIndex(i), + Balance: 32_000_000_000, + EffectiveBalance: 32_000_000_000, + } + } + return balances + } + + tests := []struct { + name string + balancesResponses [][]*chaindb.ValidatorBalance + }{ + { + name: "startBalances all zero", + balancesResponses: [][]*chaindb.ValidatorBalance{ + makeZeroBalances(), + }, + }, + { + name: "endBalances all zero", + balancesResponses: [][]*chaindb.ValidatorBalance{ + makeNonZeroBalances(), + makeZeroBalances(), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + + // main_test.go sets GlobalLevel to Disabled; raise it so the buffer captures warns. + originalGlobalLevel := zerolog.GlobalLevel() + zerolog.SetGlobalLevel(zerolog.TraceLevel) + defer zerolog.SetGlobalLevel(originalGlobalLevel) + + var buf bytes.Buffer + originalLog := log + log = zerolog.New(&buf).Level(zerolog.WarnLevel) + defer func() { log = originalLog }() + + chainDB := &stubBalanceChainDB{ + balancesResponses: test.balancesResponses, + } + + s := &Service{ + chainDB: chainDB, + chainTime: mockchaintime.New(), + } + + daySummaries := make(map[phase0.ValidatorIndex]*chaindb.ValidatorDaySummary) + startTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + endTime := startTime.AddDate(0, 0, 1) + + found, err := s.addValidatorBalanceSummaries(ctx, daySummaries, startTime, endTime) + require.NoError(t, err) + require.False(t, found, + "guard should refuse to compute a corrupt day summary when balances are all zero") + require.Empty(t, chainDB.daySummariesSet, + "guard contract violation: when addValidatorBalanceSummaries returns (false, nil) the parent summarizeValidatorsInDay never reaches SetValidatorDaySummaries — recording any write would indicate the contract was broken") + + logged := buf.String() + require.True(t, + strings.Contains(logged, "No validator balances available; cannot summarize epoch (will retry on next finality tick)"), + "warn log missing exact alert-contract message text; got: %s", logged) + require.True(t, + strings.Contains(logged, `"level":"warn"`), + "zero-balance log emitted at unexpected level; got: %s", logged) + require.True(t, + strings.Contains(logged, `"start_time"`), + "warn log missing structured start_time field for day-layer disambiguation; got: %s", logged) + require.True(t, + strings.Contains(logged, `"end_time"`), + "warn log missing structured end_time field for day-layer disambiguation; got: %s", logged) + }) + } +} diff --git a/services/synccommittees/standard/parameters.go b/services/synccommittees/standard/parameters.go index f8aae66..ac26bf7 100644 --- a/services/synccommittees/standard/parameters.go +++ b/services/synccommittees/standard/parameters.go @@ -110,11 +110,11 @@ func parseAndCheckParameters(params ...Parameter) (*parameters, error) { } // Ensure the eth2client can handle our requirements. if _, isProvider := parameters.eth2Client.(eth2client.SyncCommitteesProvider); !isProvider { - //nolint:stylecheck + //nolint:staticcheck return nil, errors.New("Ethereum 2 client does not provide sync committee information") // skipcq: SCC-ST1005 } if _, isProvider := parameters.eth2Client.(eth2client.EventsProvider); !isProvider { - //nolint:stylecheck + //nolint:staticcheck return nil, errors.New("Ethereum 2 client does not provide events") // skipcq: SCC-ST1005 } if parameters.chainDB == nil { diff --git a/services/validators/standard/handler.go b/services/validators/standard/handler.go index 2290b4f..1c1f527 100644 --- a/services/validators/standard/handler.go +++ b/services/validators/standard/handler.go @@ -28,6 +28,13 @@ import ( "go.opentelemetry.io/otel/trace" ) +// noValidatorBalancesMsg is the warn-log text emitted when the fetcher +// rejects an empty or all-zero beacon response. Alert-rule contract — both +// rejection sites must emit this exact string. See ADR 0002. +const noValidatorBalancesMsg = "Beacon returned no validator balances; cannot persist epoch (will retry on next finality tick)" + +var errNoValidatorBalances = errors.New("beacon returned no validator balances") + // OnBeaconChainHeadUpdated receives beacon chain head updated notifications. func (s *Service) OnBeaconChainHeadUpdated( ctx context.Context, @@ -181,8 +188,9 @@ func (s *Service) onEpochTransitionValidatorBalancesForEpoch(ctx context.Context defer span.End() log := log.With().Uint64("epoch", uint64(epoch)).Logger() - stateID := fmt.Sprintf("%d", s.chainTime.FirstSlotOfEpoch(epoch)) - log.Trace().Uint64("slot", uint64(s.chainTime.FirstSlotOfEpoch(epoch))).Msg("Fetching validators") + firstSlot := s.chainTime.FirstSlotOfEpoch(epoch) + stateID := fmt.Sprintf("%d", firstSlot) + log.Trace().Uint64("slot", uint64(firstSlot)).Msg("Fetching validators") validatorsResponse, err := s.eth2Client.(eth2client.ValidatorsProvider).Validators(ctx, &api.ValidatorsOpts{ State: stateID, }) @@ -191,45 +199,57 @@ func (s *Service) onEpochTransitionValidatorBalancesForEpoch(ctx context.Context } validators := validatorsResponse.Data + // Empty 200-OK response: refuse to advance so the next tick re-fetches. + // See ADR 0002 (no-empty-write contract). + if len(validators) == 0 { + log.Warn().Msg(noValidatorBalancesMsg) + return errNoValidatorBalances + } + span.AddEvent("Obtained validators", trace.WithAttributes( - attribute.Int("slot", int(s.chainTime.FirstSlotOfEpoch(epoch))), + attribute.Int("slot", int(firstSlot)), )) + dbValidatorBalances := make([]*chaindb.ValidatorBalance, 0, len(validators)) + for index, validator := range validators { + // Do not store 0 balances. + if validator.Balance == 0 { + continue + } + dbValidatorBalances = append(dbValidatorBalances, &chaindb.ValidatorBalance{ + Index: index, + Epoch: epoch, + Balance: validator.Balance, + EffectiveBalance: validator.Validator.EffectiveBalance, + }) + } + // All-zero beacon response amplified by the filter above into a zero-row + // insert. Refuse to advance. + if len(dbValidatorBalances) == 0 { + log.Warn().Msg(noValidatorBalancesMsg) + return errNoValidatorBalances + } + dbCtx, cancel, err := s.chainDB.BeginTx(ctx) if err != nil { return errors.Wrap(err, "failed to begin transaction for validator balances") } - if s.balances { - dbValidatorBalances := make([]*chaindb.ValidatorBalance, 0, len(validators)) - for index, validator := range validators { - // Do not store 0 balances. - if validator.Balance == 0 { - continue - } - dbValidatorBalances = append(dbValidatorBalances, &chaindb.ValidatorBalance{ - Index: index, - Epoch: epoch, - Balance: validator.Balance, - EffectiveBalance: validator.Validator.EffectiveBalance, - }) + if err := s.validatorsSetter.SetValidatorBalances(dbCtx, dbValidatorBalances); err != nil { + log.Trace().Err(err).Msg("Bulk insert failed; falling back to individual insert") + // This error will have caused the transaction to fail, so cancel it and start a new one. + cancel() + dbCtx, cancel, err = s.chainDB.BeginTx(ctx) + if err != nil { + return errors.Wrap(err, "failed to begin transaction for validator balances (2)") } - if err := s.validatorsSetter.SetValidatorBalances(dbCtx, dbValidatorBalances); err != nil { - log.Trace().Err(err).Msg("Bulk insert failed; falling back to individual insert") - // This error will have caused the transaction to fail, so cancel it and start a new one. - cancel() - dbCtx, cancel, err = s.chainDB.BeginTx(ctx) - if err != nil { - return errors.Wrap(err, "failed to begin transaction for validator balances (2)") - } - for _, dbValidatorBalance := range dbValidatorBalances { - if err := s.validatorsSetter.SetValidatorBalance(dbCtx, dbValidatorBalance); err != nil { - cancel() - return errors.Wrap(err, "failed to set validator balance") - } + for _, dbValidatorBalance := range dbValidatorBalances { + if err := s.validatorsSetter.SetValidatorBalance(dbCtx, dbValidatorBalance); err != nil { + cancel() + return errors.Wrap(err, "failed to set validator balance") } } - md.LatestBalancesEpoch = epoch } + md.LatestBalancesEpoch = epoch span.AddEvent("Updated validators") if err := s.setMetadata(dbCtx, md); err != nil { diff --git a/services/validators/standard/handler_internal_test.go b/services/validators/standard/handler_internal_test.go new file mode 100644 index 0000000..ab29d6c --- /dev/null +++ b/services/validators/standard/handler_internal_test.go @@ -0,0 +1,389 @@ +// Copyright © 2026 Weald Technology Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standard + +import ( + "bytes" + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/attestantio/go-eth2-client/api" + apiv1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "github.com/wealdtech/chaind/services/chaindb" +) + +// Tests in this file mutate the package-level `log` variable via captureWarnLog +// and restore it on teardown. Do NOT add t.Parallel() to any test here — the +// global mutation would race across goroutines. + +// captureWarnLog redirects the package logger to a byte buffer at WarnLevel +// and returns the buffer plus a restore func. +func captureWarnLog(t *testing.T) (*bytes.Buffer, func()) { + t.Helper() + originalGlobalLevel := zerolog.GlobalLevel() + zerolog.SetGlobalLevel(zerolog.TraceLevel) + + var buf bytes.Buffer + originalLog := log + log = zerolog.New(&buf).Level(zerolog.WarnLevel) + + return &buf, func() { + log = originalLog + zerolog.SetGlobalLevel(originalGlobalLevel) + } +} + +// stubEth2Client implements eth2client.Service + ValidatorsProvider; only +// Validators is exercised. +type stubEth2Client struct { + response *api.Response[map[phase0.ValidatorIndex]*apiv1.Validator] + err error + calls int +} + +func (s *stubEth2Client) Name() string { return "stub" } +func (s *stubEth2Client) Address() string { return "stub" } +func (s *stubEth2Client) IsActive() bool { return true } +func (s *stubEth2Client) IsSynced() bool { return true } + +func (s *stubEth2Client) Validators(_ context.Context, _ *api.ValidatorsOpts) ( + *api.Response[map[phase0.ValidatorIndex]*apiv1.Validator], error, +) { + s.calls++ + if s.err != nil { + return nil, s.err + } + return s.response, nil +} + +// recordingChainDB captures BeginTx/CommitTx ordering and SetMetadata payloads +// so tests can assert the cursor advanced and the transaction committed. +type recordingChainDB struct { + beginTxCalls int + commitTxCalls int + setMetadata [][]byte + cancelCalls int +} + +func (c *recordingChainDB) BeginTx(ctx context.Context) (context.Context, context.CancelFunc, error) { + c.beginTxCalls++ + return ctx, func() { c.cancelCalls++ }, nil +} + +func (c *recordingChainDB) CommitTx(_ context.Context) error { + c.commitTxCalls++ + return nil +} + +func (c *recordingChainDB) BeginROTx(ctx context.Context) (context.Context, error) { + return ctx, nil +} + +func (c *recordingChainDB) CommitROTx(_ context.Context) {} + +func (c *recordingChainDB) SetMetadata(_ context.Context, _ string, value []byte) error { + dup := make([]byte, len(value)) + copy(dup, value) + c.setMetadata = append(c.setMetadata, dup) + return nil +} + +func (c *recordingChainDB) Metadata(_ context.Context, _ string) ([]byte, error) { + return nil, nil +} + +// recordingValidatorsSetter captures every SetValidatorBalances/SetValidatorBalance +// call so tests can assert what was persisted. +type recordingValidatorsSetter struct { + bulkCalls [][]*chaindb.ValidatorBalance + individualBalances []*chaindb.ValidatorBalance +} + +func (r *recordingValidatorsSetter) SetValidator(_ context.Context, _ *chaindb.Validator) error { + return nil +} + +func (r *recordingValidatorsSetter) SetValidatorBalance(_ context.Context, balance *chaindb.ValidatorBalance) error { + r.individualBalances = append(r.individualBalances, balance) + return nil +} + +func (r *recordingValidatorsSetter) SetValidatorBalances(_ context.Context, balances []*chaindb.ValidatorBalance) error { + dup := make([]*chaindb.ValidatorBalance, len(balances)) + copy(dup, balances) + r.bulkCalls = append(r.bulkCalls, dup) + return nil +} + +// stubChainTime satisfies chaintime.Service; only FirstSlotOfEpoch is invoked. +type stubChainTime struct{} + +func (stubChainTime) GenesisTime() time.Time { return time.Time{} } +func (stubChainTime) SlotDuration() time.Duration { return 0 } +func (stubChainTime) SlotsPerEpoch() uint64 { return 32 } +func (stubChainTime) StartOfSlot(_ phase0.Slot) time.Time { return time.Time{} } +func (stubChainTime) StartOfEpoch(_ phase0.Epoch) time.Time { return time.Time{} } +func (stubChainTime) CurrentSlot() phase0.Slot { return 0 } +func (stubChainTime) CurrentEpoch() phase0.Epoch { return 0 } +func (stubChainTime) CurrentSyncCommitteePeriod() uint64 { return 0 } +func (stubChainTime) SlotToEpoch(_ phase0.Slot) phase0.Epoch { return 0 } +func (stubChainTime) SlotToSyncCommitteePeriod(_ phase0.Slot) uint64 { return 0 } +func (stubChainTime) EpochToSyncCommitteePeriod(_ phase0.Epoch) uint64 { return 0 } +func (stubChainTime) FirstSlotOfEpoch(epoch phase0.Epoch) phase0.Slot { + return phase0.Slot(uint64(epoch) * 32) +} +func (stubChainTime) LastSlotOfEpoch(_ phase0.Epoch) phase0.Slot { return 0 } +func (stubChainTime) TimestampToSlot(_ time.Time) phase0.Slot { return 0 } +func (stubChainTime) TimestampToEpoch(_ time.Time) phase0.Epoch { return 0 } +func (stubChainTime) FirstEpochOfSyncPeriod(_ uint64) phase0.Epoch { return 0 } +func (stubChainTime) AltairInitialEpoch() phase0.Epoch { return 0 } +func (stubChainTime) AltairInitialSyncCommitteePeriod() uint64 { return 0 } +func (stubChainTime) BellatrixInitialEpoch() phase0.Epoch { return 0 } +func (stubChainTime) CapellaInitialEpoch() phase0.Epoch { return 0 } + +// makeService wires the stubs into a *Service for the +// onEpochTransitionValidatorBalancesForEpoch path. +func makeService(eth2 *stubEth2Client, db *recordingChainDB, setter *recordingValidatorsSetter) *Service { + return &Service{ + eth2Client: eth2, + chainDB: db, + validatorsSetter: setter, + chainTime: stubChainTime{}, + balances: true, + } +} + +// Empty 200-OK response: cursor stays put, no transaction opens, warn-log +// fires. Regression boundary for ADR 0002. +func TestOnEpochTransitionValidatorBalancesForEpoch_EmptyValidatorsRejectedAndCursorUnchanged(t *testing.T) { + const epoch = phase0.Epoch(86823) + const startCursor = phase0.Epoch(86822) + + buf, restore := captureWarnLog(t) + defer restore() + + eth2 := &stubEth2Client{ + response: &api.Response[map[phase0.ValidatorIndex]*apiv1.Validator]{ + Data: map[phase0.ValidatorIndex]*apiv1.Validator{}, + }, + } + db := &recordingChainDB{} + setter := &recordingValidatorsSetter{} + s := makeService(eth2, db, setter) + + md := &metadata{LatestBalancesEpoch: startCursor} + + err := s.onEpochTransitionValidatorBalancesForEpoch(context.Background(), md, epoch) + + require.Error(t, err, + "empty 200-OK response must surface an error so the parent handler logs it") + require.Contains(t, err.Error(), "beacon returned no validator balances", + "error text must carry the stable contract substring used by alert rules") + + require.Equal(t, 1, eth2.calls, "beacon Validators must be called exactly once") + require.Equal(t, 0, db.beginTxCalls, + "guard 1 fires before BeginTx; no transaction may open on this path") + require.Equal(t, 0, db.commitTxCalls, + "no transaction may commit when the guard rejects the response") + require.Equal(t, 0, db.cancelCalls, + "no transaction was opened, so cancel must not be invoked") + + require.Empty(t, setter.bulkCalls, + "SetValidatorBalances must not be called when the response is empty") + require.Empty(t, setter.individualBalances, + "SetValidatorBalance must not be called when the response is empty") + require.Empty(t, db.setMetadata, + "SetMetadata must not be called when the response is empty") + + require.Equal(t, startCursor, md.LatestBalancesEpoch, + "cursor must NOT advance when the beacon returns no validators") + + logged := buf.String() + require.True(t, strings.Contains(logged, noValidatorBalancesMsg), + "warn log missing alert-contract message text; got: %s", logged) + require.True(t, strings.Contains(logged, `"level":"warn"`), + "empty-response log emitted at unexpected level; got: %s", logged) + require.True(t, strings.Contains(logged, `"epoch":86823`), + "warn log missing structured epoch field for operator disambiguation; got: %s", logged) +} + +// All-zero beacon response amplified by the "Do not store 0 balances" filter +// into a zero-row insert. Cursor stays put, no transaction opens, warn-log +// fires. +func TestOnEpochTransitionValidatorBalancesForEpoch_AllZeroBalancesRejectedAndCursorUnchanged(t *testing.T) { + const epoch = phase0.Epoch(86824) + const startCursor = phase0.Epoch(86823) + const numValidators = 1000 + + buf, restore := captureWarnLog(t) + defer restore() + + validators := make(map[phase0.ValidatorIndex]*apiv1.Validator, numValidators) + for i := range numValidators { + idx := phase0.ValidatorIndex(i) + validators[idx] = &apiv1.Validator{ + Index: idx, + Balance: 0, + Status: apiv1.ValidatorStateActiveOngoing, + Validator: &phase0.Validator{ + EffectiveBalance: 0, + }, + } + } + + eth2 := &stubEth2Client{ + response: &api.Response[map[phase0.ValidatorIndex]*apiv1.Validator]{ + Data: validators, + }, + } + db := &recordingChainDB{} + setter := &recordingValidatorsSetter{} + s := makeService(eth2, db, setter) + + md := &metadata{LatestBalancesEpoch: startCursor} + + err := s.onEpochTransitionValidatorBalancesForEpoch(context.Background(), md, epoch) + + require.Error(t, err, + "all-zero-balance response must surface an error so the parent handler logs it") + require.Contains(t, err.Error(), "beacon returned no validator balances", + "error text must carry the stable contract substring used by alert rules") + + require.Equal(t, 1, eth2.calls, "beacon Validators must be called exactly once") + require.Equal(t, 0, db.beginTxCalls, + "guard 2 fires before BeginTx; no transaction may open on this path") + require.Equal(t, 0, db.commitTxCalls, + "no transaction may commit when the guard rejects the filtered response") + require.Equal(t, 0, db.cancelCalls, + "no transaction was opened, so cancel must not be invoked") + + require.Empty(t, setter.bulkCalls, + "SetValidatorBalances must not be called when the filtered slice is empty") + require.Empty(t, setter.individualBalances, + "SetValidatorBalance must not be called when the filtered slice is empty") + require.Empty(t, db.setMetadata, + "SetMetadata must not be called when the cursor is not advanced") + + require.Equal(t, startCursor, md.LatestBalancesEpoch, + "cursor must NOT advance when the all-zero filter collapses the response to nothing") + + logged := buf.String() + require.True(t, strings.Contains(logged, noValidatorBalancesMsg), + "warn log missing alert-contract message text; got: %s", logged) + require.True(t, strings.Contains(logged, `"level":"warn"`), + "all-zero log emitted at unexpected level; got: %s", logged) + require.True(t, strings.Contains(logged, `"epoch":86824`), + "warn log missing structured epoch field for operator disambiguation; got: %s", logged) +} + +// Mixed zero/non-zero balances: filter drops zeros, function persists the +// non-zero half and advances the cursor. +func TestOnEpochTransitionValidatorBalancesForEpoch_MixedBalancesPersistsNonZero(t *testing.T) { + const epoch = phase0.Epoch(100) + const numValidators = 1000 + const numNonZero = 500 + const nonZeroBalance = phase0.Gwei(32_000_000_000) + + validators := make(map[phase0.ValidatorIndex]*apiv1.Validator, numValidators) + for i := range numValidators { + idx := phase0.ValidatorIndex(i) + var balance phase0.Gwei + if i < numNonZero { + balance = nonZeroBalance + } + validators[idx] = &apiv1.Validator{ + Index: idx, + Balance: balance, + Status: apiv1.ValidatorStateActiveOngoing, + Validator: &phase0.Validator{ + EffectiveBalance: balance, + }, + } + } + + eth2 := &stubEth2Client{ + response: &api.Response[map[phase0.ValidatorIndex]*apiv1.Validator]{ + Data: validators, + }, + } + db := &recordingChainDB{} + setter := &recordingValidatorsSetter{} + s := makeService(eth2, db, setter) + + md := &metadata{LatestBalancesEpoch: epoch - 1} + + err := s.onEpochTransitionValidatorBalancesForEpoch(context.Background(), md, epoch) + + require.NoError(t, err, "mixed-balance response surfaces no error today") + require.Equal(t, 1, eth2.calls, "beacon Validators must be called exactly once") + require.Equal(t, 1, db.beginTxCalls, "must begin exactly one transaction") + require.Equal(t, 1, db.commitTxCalls, "transaction must commit") + require.Empty(t, setter.individualBalances, + "fallback per-row insert path must not be entered when bulk insert succeeds") + + require.Len(t, setter.bulkCalls, 1, "SetValidatorBalances must be called once") + persisted := setter.bulkCalls[0] + require.Len(t, persisted, numNonZero, + "only validators with Balance > 0 should be persisted") + for _, b := range persisted { + require.Equal(t, nonZeroBalance, b.Balance, + "every persisted row must carry the non-zero balance") + require.Equal(t, epoch, b.Epoch, + "every persisted row must be tagged with the requested epoch") + require.Less(t, uint64(b.Index), uint64(numNonZero), + "only validators with index < numNonZero have non-zero balances") + } + + require.Equal(t, epoch, md.LatestBalancesEpoch, + "cursor advances when at least some rows persist (expected today)") +} + +// Negative control: beacon error surfaces, no transaction opens, cursor +// stays put. +func TestOnEpochTransitionValidatorBalancesForEpoch_BeaconErrorLeavesCursorUnchanged(t *testing.T) { + const epoch = phase0.Epoch(50) + const startCursor = phase0.Epoch(49) + + eth2 := &stubEth2Client{ + err: errors.New("beacon error"), + } + db := &recordingChainDB{} + setter := &recordingValidatorsSetter{} + s := makeService(eth2, db, setter) + + md := &metadata{LatestBalancesEpoch: startCursor} + + err := s.onEpochTransitionValidatorBalancesForEpoch(context.Background(), md, epoch) + + require.Error(t, err, "beacon error must surface to the caller") + require.Contains(t, err.Error(), "failed to obtain validators for validator balances", + "error wrapping at handler.go:190 must be preserved") + require.Contains(t, err.Error(), "beacon error", + "original beacon error must be preserved through the wrap") + + require.Equal(t, 1, eth2.calls, "beacon Validators must be called exactly once") + require.Equal(t, 0, db.beginTxCalls, "no transaction may begin when the beacon errors") + require.Equal(t, 0, db.commitTxCalls, "no transaction may commit when the beacon errors") + require.Empty(t, setter.bulkCalls, "SetValidatorBalances must not be called") + require.Empty(t, setter.individualBalances, "SetValidatorBalance must not be called") + require.Empty(t, db.setMetadata, "SetMetadata must not be called when the beacon errors") + require.Equal(t, startCursor, md.LatestBalancesEpoch, + "cursor must NOT advance on beacon error (negative control)") +} diff --git a/services/validators/standard/metadata.go b/services/validators/standard/metadata.go index 3988295..ca8e910 100644 --- a/services/validators/standard/metadata.go +++ b/services/validators/standard/metadata.go @@ -23,9 +23,12 @@ import ( // metadata stored about this service. type metadata struct { - LatestEpoch phase0.Epoch `json:"latest_epoch"` - LatestBalancesEpoch phase0.Epoch `json:"latest_balances_epoch"` - MissedEpochs []phase0.Epoch `json:"missed_epochs,omitempty"` + LatestEpoch phase0.Epoch `json:"latest_epoch"` + LatestBalancesEpoch phase0.Epoch `json:"latest_balances_epoch"` + // Deprecated: never populated. Residue from an abandoned gap-tracking + // design. Retained for JSON backward-compatibility with t_metadata rows + // persisted by older builds. + MissedEpochs []phase0.Epoch `json:"missed_epochs,omitempty"` } // metadataKey is the key for the metadata.