From 50f39b1167406d2a3f1aff051cf490729e88e4a7 Mon Sep 17 00:00:00 2001 From: Kirill Simakov Date: Fri, 3 Apr 2026 16:22:58 +0200 Subject: [PATCH 1/5] Add blocks.attestations.enable flag to skip attestation saving Same pattern as blocks.blobs.enable (commit fdc70b8). When set to false, updateAttestationsForBlock and updateElectraAttestationsForBlock return early without writing to t_attestations. Useful for backfill scenarios where only deposit_requests and sync_committees are needed - avoids writing ~144 GB of attestation data. --- main.go | 2 ++ services/blocks/standard/handler.go | 8 ++++++++ services/blocks/standard/parameters.go | 19 ++++++++++++++----- services/blocks/standard/service.go | 3 +++ 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/main.go b/main.go index c2e4856..196ce64 100644 --- a/main.go +++ b/main.go @@ -162,6 +162,7 @@ func fetchConfig() error { pflag.Int32("blocks.start-slot", -1, "Slot from which to start fetching blocks") pflag.Bool("blocks.refetch", false, "Refetch all blocks even if they are already in the database") pflag.Bool("blocks.blobs.enable", true, "Enable saving of blobs for block-related information") + pflag.Bool("blocks.attestations.enable", true, "Enable saving of attestations for block-related information") pflag.Bool("finalizer.enable", true, "Enable additional information on receipt of finality checkpoint") pflag.Bool("summarizer.enable", true, "Enable summary information") pflag.Bool("summarizer.epochs.enable", true, "Enable summary information for epochs") @@ -510,6 +511,7 @@ func startBlocks( standardblocks.WithStartSlot(viper.GetInt64("blocks.start-slot")), standardblocks.WithRefetch(viper.GetBool("blocks.refetch")), standardblocks.WithBlobsSaving(viper.GetBool("blocks.blobs.enable")), + standardblocks.WithAttestationsSaving(viper.GetBool("blocks.attestations.enable")), standardblocks.WithActivitySem(activitySem), ) if err != nil { diff --git a/services/blocks/standard/handler.go b/services/blocks/standard/handler.go index 7434c44..62d2d8c 100644 --- a/services/blocks/standard/handler.go +++ b/services/blocks/standard/handler.go @@ -510,6 +510,10 @@ func (s *Service) updateAttestationsForBlock(ctx context.Context, blockRoot phase0.Root, attestations []*phase0.Attestation, ) error { + if !s.attestationsSaving { + return nil + } + ctx, span := otel.Tracer("wealdtech.chaind.services.blocks.standard").Start(ctx, "updateAttestationsForBlock") defer span.End() @@ -558,6 +562,10 @@ func (s *Service) updateElectraAttestationsForBlock(ctx context.Context, blockRoot phase0.Root, attestations []*electra.Attestation, ) error { + if !s.attestationsSaving { + return nil + } + ctx, span := otel.Tracer("wealdtech.chaind.services.blocks.standard").Start(ctx, "updateElectraAttestationsForBlock") defer span.End() diff --git a/services/blocks/standard/parameters.go b/services/blocks/standard/parameters.go index 108c1fb..005973f 100644 --- a/services/blocks/standard/parameters.go +++ b/services/blocks/standard/parameters.go @@ -31,9 +31,10 @@ type parameters struct { chainDB chaindb.Service chainTime chaintime.Service startSlot int64 - refetch bool - blobsSaving bool - activitySem *semaphore.Weighted + refetch bool + blobsSaving bool + attestationsSaving bool + activitySem *semaphore.Weighted } // Parameter is the interface for service parameters. @@ -103,6 +104,13 @@ func WithBlobsSaving(blobsSaving bool) Parameter { }) } +// WithAttestationsSaving sets the attestationsSaving flag for this module. +func WithAttestationsSaving(attestationsSaving bool) Parameter { + return parameterFunc(func(p *parameters) { + p.attestationsSaving = attestationsSaving + }) +} + // WithActivitySem sets the activity semaphore for this module. func WithActivitySem(sem *semaphore.Weighted) Parameter { return parameterFunc(func(p *parameters) { @@ -113,8 +121,9 @@ func WithActivitySem(sem *semaphore.Weighted) Parameter { // parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct. func parseAndCheckParameters(params ...Parameter) (*parameters, error) { parameters := parameters{ - logLevel: zerolog.GlobalLevel(), - startSlot: -1, + logLevel: zerolog.GlobalLevel(), + startSlot: -1, + attestationsSaving: true, } for _, p := range params { if params != nil { diff --git a/services/blocks/standard/service.go b/services/blocks/standard/service.go index d11547d..020fb0a 100644 --- a/services/blocks/standard/service.go +++ b/services/blocks/standard/service.go @@ -48,6 +48,7 @@ type Service struct { chainTime chaintime.Service refetch bool blobsSaving bool + attestationsSaving bool lastHandledBlockRoot phase0.Root activitySem *semaphore.Weighted syncCommittees map[uint64]*chaindb.SyncCommittee @@ -154,12 +155,14 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { chainTime: parameters.chainTime, refetch: parameters.refetch, blobsSaving: parameters.blobsSaving, + attestationsSaving: parameters.attestationsSaving, activitySem: parameters.activitySem, syncCommittees: make(map[uint64]*chaindb.SyncCommittee), } log.Trace(). Bool("blobsSaving", parameters.blobsSaving). + Bool("attestationsSaving", parameters.attestationsSaving). Bool("refetch", parameters.refetch). Int64("startSlot", parameters.startSlot). Msg("Blocks Service initialized") From 1a99ea753fc96e9414203282775499445ba29097 Mon Sep 17 00:00:00 2001 From: Kirill Simakov Date: Fri, 3 Apr 2026 16:29:29 +0200 Subject: [PATCH 2/5] Update README with blocks.attestations.enable config option --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a945869..da8e531 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ Data gathers four pieces of information from the beacon node, broken down by the - **Validators** The validators module provides information on the current statue of validators. It can also obtain information on the validators' balances and effective balances at a given epoch; - **Blocks** The blocks module provides information on blocks proposed for each slot. This includes: - the block structure - - attestations + - attestations (can be disabled with `blocks.attestations.enable: false`) - proposer slashings - attester slashings - deposits @@ -206,6 +206,10 @@ blocks: # blobs.enable will fetch and save blobs for block blobs: enable: true + # attestations.enable will save attestations contained in blocks. + # Disable to reduce database size during backfill when attestations are not needed. + attestations: + enable: true # validators contains configuration for obtaining validator-related information. validators: enable: true From 013462bab183d94910c1e005da23548ba8656798 Mon Sep 17 00:00:00 2001 From: Kirill Simakov Date: Sat, 18 Apr 2026 21:09:19 +0200 Subject: [PATCH 3/5] Detach validator handler from SSE event stream context In go-eth2-client v0.27.1, head event handlers receive the SSE connection context. When the SSE stream disconnects (Cloudflare timeout, network issues), the context gets canceled and kills in-flight beacon state downloads for validator balances (~325 MB, takes 8+ seconds). This was not an issue with go-eth2-client v0.24.0 where handlers did not receive the SSE context. The old DWH ChainD (v0.10.1-dbb, go-eth2-client v0.24.0) works fine with the same beacon nodes for this reason. Fix: use context.WithoutCancel() to detach from SSE context, restoring the v0.24.0 behavior. --- services/validators/standard/handler.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/services/validators/standard/handler.go b/services/validators/standard/handler.go index 2290b4f..1003adf 100644 --- a/services/validators/standard/handler.go +++ b/services/validators/standard/handler.go @@ -37,6 +37,13 @@ func (s *Service) OnBeaconChainHeadUpdated( // skipcq: RVV-A0005 epochTransition bool, ) { + // Detach from SSE event stream context. In go-eth2-client v0.27.1+, + // the head handler receives the SSE connection context. If the SSE stream + // disconnects (e.g. Cloudflare timeout), this context gets canceled and + // kills any in-flight state downloads for validator balances. + // This restores the v0.24.0 behavior where handlers were context-independent. + ctx = context.WithoutCancel(ctx) + ctx, span := otel.Tracer("wealdtech.chaind.services.blocks.standard").Start(ctx, "OnBeaconChainHeadUpdated", trace.WithAttributes( attribute.Int64("slot", int64(slot)), From 4bd5ff405671f23b1dd42bb1a779f399b25d94c7 Mon Sep 17 00:00:00 2001 From: Kirill Simakov Date: Tue, 21 Apr 2026 09:03:10 +0200 Subject: [PATCH 4/5] Add timing logs to validators handler Log elapsed time for each step of epoch processing: - validators fetch (state="head" download + DB compare) - balances fetch + write per epoch - total handler duration Needed to diagnose why balance gap grows overnight despite zero errors from CF/beacon node. --- services/validators/standard/handler.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/services/validators/standard/handler.go b/services/validators/standard/handler.go index 1003adf..4d1cab4 100644 --- a/services/validators/standard/handler.go +++ b/services/validators/standard/handler.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "fmt" + "time" eth2client "github.com/attestantio/go-eth2-client" "github.com/attestantio/go-eth2-client/api" @@ -65,7 +66,8 @@ func (s *Service) OnBeaconChainHeadUpdated( return } - log.Trace().Msg("Handling epoch transition") + handlerStart := time.Now() + log.Info().Msg("Handling epoch transition") md, err := s.getMetadata(ctx) if err != nil { @@ -74,16 +76,24 @@ func (s *Service) OnBeaconChainHeadUpdated( return } + validatorsStart := time.Now() if err := s.onEpochTransitionValidators(ctx, md, epoch); err != nil { - log.Warn().Err(err).Msg("Failed to update validators") + log.Warn().Err(err).Dur("elapsed", time.Since(validatorsStart)).Msg("Failed to update validators") + } else { + log.Info().Dur("elapsed", time.Since(validatorsStart)).Msg("Validators updated") } + + balancesStart := time.Now() + balancesGap := int64(epoch) - int64(md.LatestBalancesEpoch) if err := s.onEpochTransitionValidatorBalances(ctx, md, epoch); err != nil { - log.Warn().Err(err).Msg("Failed to update validators") + log.Warn().Err(err).Dur("elapsed", time.Since(balancesStart)).Int64("catchup_epochs", balancesGap).Msg("Failed to update balances") + } else { + log.Info().Dur("elapsed", time.Since(balancesStart)).Int64("catchup_epochs", balancesGap).Msg("Balances updated") } s.activitySem.Release(1) monitorEpochProcessed(epoch) - log.Trace().Msg("Finished handling epoch transition") + log.Info().Dur("total", time.Since(handlerStart)).Msg("Finished handling epoch transition") } func (s *Service) onEpochTransitionValidators(ctx context.Context, @@ -189,7 +199,8 @@ func (s *Service) onEpochTransitionValidatorBalancesForEpoch(ctx context.Context log := log.With().Uint64("epoch", uint64(epoch)).Logger() stateID := fmt.Sprintf("%d", s.chainTime.FirstSlotOfEpoch(epoch)) - log.Trace().Uint64("slot", uint64(s.chainTime.FirstSlotOfEpoch(epoch))).Msg("Fetching validators") + + fetchStart := time.Now() validatorsResponse, err := s.eth2Client.(eth2client.ValidatorsProvider).Validators(ctx, &api.ValidatorsOpts{ State: stateID, }) @@ -197,11 +208,13 @@ func (s *Service) onEpochTransitionValidatorBalancesForEpoch(ctx context.Context return errors.Wrap(err, "failed to obtain validators for validator balances") } validators := validatorsResponse.Data + fetchDur := time.Since(fetchStart) span.AddEvent("Obtained validators", trace.WithAttributes( attribute.Int("slot", int(s.chainTime.FirstSlotOfEpoch(epoch))), )) + writeStart := time.Now() dbCtx, cancel, err := s.chainDB.BeginTx(ctx) if err != nil { return errors.Wrap(err, "failed to begin transaction for validator balances") @@ -248,6 +261,8 @@ func (s *Service) onEpochTransitionValidatorBalancesForEpoch(ctx context.Context cancel() return errors.Wrap(err, "failed to set commit transaction for validator balances") } + writeDur := time.Since(writeStart) + log.Info().Dur("fetch", fetchDur).Dur("write", writeDur).Int("validators", len(validators)).Msg("Balance epoch processed") monitorBalancesEpochProcessed(epoch) return nil From 1ddc23dce72a4060c6dd1c1689315727d9794a94 Mon Sep 17 00:00:00 2001 From: Kirill Simakov Date: Mon, 11 May 2026 08:53:14 +0200 Subject: [PATCH 5/5] Add changelog and docker-compose example entries for new flags --- CHANGELOG.md | 5 +++++ chaind.config.docker-compose.yml | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 841f879..c79e004 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +dev: + - added `blocks.attestations.enable` configuration flag (default: true) to control whether attestations contained in blocks should be saved + - detach validator handler from SSE event-stream context so a stream reconnect does not cancel an in-flight balance write + - added fetch/write duration logs to the validator balances handler + 0.10.2: - added `blocks.blobs.enable` configuration flag (default: true) to control whether blobs for blocks should be fetched and saved diff --git a/chaind.config.docker-compose.yml b/chaind.config.docker-compose.yml index 2b4a933..4d735b8 100644 --- a/chaind.config.docker-compose.yml +++ b/chaind.config.docker-compose.yml @@ -36,6 +36,10 @@ blocks: # blobs.enable will fetch and save blobs for blocks blobs: enable: true + # attestations.enable will save attestations contained in blocks. + # Disable to reduce database size during backfill when attestations are not needed. + attestations: + enable: true # validators contains configuration for obtaining validator-related information. validators: enable: true