diff --git a/CHANGELOG.md b/CHANGELOG.md index 841f879..c79e004 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +dev: + - added `blocks.attestations.enable` configuration flag (default: true) to control whether attestations contained in blocks should be saved + - detach validator handler from SSE event-stream context so a stream reconnect does not cancel an in-flight balance write + - added fetch/write duration logs to the validator balances handler + 0.10.2: - added `blocks.blobs.enable` configuration flag (default: true) to control whether blobs for blocks should be fetched and saved diff --git a/README.md b/README.md index a945869..da8e531 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ Data gathers four pieces of information from the beacon node, broken down by the - **Validators** The validators module provides information on the current statue of validators. It can also obtain information on the validators' balances and effective balances at a given epoch; - **Blocks** The blocks module provides information on blocks proposed for each slot. This includes: - the block structure - - attestations + - attestations (can be disabled with `blocks.attestations.enable: false`) - proposer slashings - attester slashings - deposits @@ -206,6 +206,10 @@ blocks: # blobs.enable will fetch and save blobs for block blobs: enable: true + # attestations.enable will save attestations contained in blocks. + # Disable to reduce database size during backfill when attestations are not needed. + attestations: + enable: true # validators contains configuration for obtaining validator-related information. validators: enable: true diff --git a/chaind.config.docker-compose.yml b/chaind.config.docker-compose.yml index 2b4a933..4d735b8 100644 --- a/chaind.config.docker-compose.yml +++ b/chaind.config.docker-compose.yml @@ -36,6 +36,10 @@ blocks: # blobs.enable will fetch and save blobs for blocks blobs: enable: true + # attestations.enable will save attestations contained in blocks. + # Disable to reduce database size during backfill when attestations are not needed. + attestations: + enable: true # validators contains configuration for obtaining validator-related information. validators: enable: true diff --git a/main.go b/main.go index c2e4856..196ce64 100644 --- a/main.go +++ b/main.go @@ -162,6 +162,7 @@ func fetchConfig() error { pflag.Int32("blocks.start-slot", -1, "Slot from which to start fetching blocks") pflag.Bool("blocks.refetch", false, "Refetch all blocks even if they are already in the database") pflag.Bool("blocks.blobs.enable", true, "Enable saving of blobs for block-related information") + pflag.Bool("blocks.attestations.enable", true, "Enable saving of attestations for block-related information") pflag.Bool("finalizer.enable", true, "Enable additional information on receipt of finality checkpoint") pflag.Bool("summarizer.enable", true, "Enable summary information") pflag.Bool("summarizer.epochs.enable", true, "Enable summary information for epochs") @@ -510,6 +511,7 @@ func startBlocks( standardblocks.WithStartSlot(viper.GetInt64("blocks.start-slot")), standardblocks.WithRefetch(viper.GetBool("blocks.refetch")), standardblocks.WithBlobsSaving(viper.GetBool("blocks.blobs.enable")), + standardblocks.WithAttestationsSaving(viper.GetBool("blocks.attestations.enable")), standardblocks.WithActivitySem(activitySem), ) if err != nil { diff --git a/services/blocks/standard/handler.go b/services/blocks/standard/handler.go index 7434c44..62d2d8c 100644 --- a/services/blocks/standard/handler.go +++ b/services/blocks/standard/handler.go @@ -510,6 +510,10 @@ func (s *Service) updateAttestationsForBlock(ctx context.Context, blockRoot phase0.Root, attestations []*phase0.Attestation, ) error { + if !s.attestationsSaving { + return nil + } + ctx, span := otel.Tracer("wealdtech.chaind.services.blocks.standard").Start(ctx, "updateAttestationsForBlock") defer span.End() @@ -558,6 +562,10 @@ func (s *Service) updateElectraAttestationsForBlock(ctx context.Context, blockRoot phase0.Root, attestations []*electra.Attestation, ) error { + if !s.attestationsSaving { + return nil + } + ctx, span := otel.Tracer("wealdtech.chaind.services.blocks.standard").Start(ctx, "updateElectraAttestationsForBlock") defer span.End() diff --git a/services/blocks/standard/parameters.go b/services/blocks/standard/parameters.go index 108c1fb..005973f 100644 --- a/services/blocks/standard/parameters.go +++ b/services/blocks/standard/parameters.go @@ -31,9 +31,10 @@ type parameters struct { chainDB chaindb.Service chainTime chaintime.Service startSlot int64 - refetch bool - blobsSaving bool - activitySem *semaphore.Weighted + refetch bool + blobsSaving bool + attestationsSaving bool + activitySem *semaphore.Weighted } // Parameter is the interface for service parameters. @@ -103,6 +104,13 @@ func WithBlobsSaving(blobsSaving bool) Parameter { }) } +// WithAttestationsSaving sets the attestationsSaving flag for this module. +func WithAttestationsSaving(attestationsSaving bool) Parameter { + return parameterFunc(func(p *parameters) { + p.attestationsSaving = attestationsSaving + }) +} + // WithActivitySem sets the activity semaphore for this module. func WithActivitySem(sem *semaphore.Weighted) Parameter { return parameterFunc(func(p *parameters) { @@ -113,8 +121,9 @@ func WithActivitySem(sem *semaphore.Weighted) Parameter { // parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct. func parseAndCheckParameters(params ...Parameter) (*parameters, error) { parameters := parameters{ - logLevel: zerolog.GlobalLevel(), - startSlot: -1, + logLevel: zerolog.GlobalLevel(), + startSlot: -1, + attestationsSaving: true, } for _, p := range params { if params != nil { diff --git a/services/blocks/standard/service.go b/services/blocks/standard/service.go index d11547d..020fb0a 100644 --- a/services/blocks/standard/service.go +++ b/services/blocks/standard/service.go @@ -48,6 +48,7 @@ type Service struct { chainTime chaintime.Service refetch bool blobsSaving bool + attestationsSaving bool lastHandledBlockRoot phase0.Root activitySem *semaphore.Weighted syncCommittees map[uint64]*chaindb.SyncCommittee @@ -154,12 +155,14 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { chainTime: parameters.chainTime, refetch: parameters.refetch, blobsSaving: parameters.blobsSaving, + attestationsSaving: parameters.attestationsSaving, activitySem: parameters.activitySem, syncCommittees: make(map[uint64]*chaindb.SyncCommittee), } log.Trace(). Bool("blobsSaving", parameters.blobsSaving). + Bool("attestationsSaving", parameters.attestationsSaving). Bool("refetch", parameters.refetch). Int64("startSlot", parameters.startSlot). Msg("Blocks Service initialized") diff --git a/services/validators/standard/handler.go b/services/validators/standard/handler.go index 2290b4f..4d1cab4 100644 --- a/services/validators/standard/handler.go +++ b/services/validators/standard/handler.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "fmt" + "time" eth2client "github.com/attestantio/go-eth2-client" "github.com/attestantio/go-eth2-client/api" @@ -37,6 +38,13 @@ func (s *Service) OnBeaconChainHeadUpdated( // skipcq: RVV-A0005 epochTransition bool, ) { + // Detach from SSE event stream context. In go-eth2-client v0.27.1+, + // the head handler receives the SSE connection context. If the SSE stream + // disconnects (e.g. Cloudflare timeout), this context gets canceled and + // kills any in-flight state downloads for validator balances. + // This restores the v0.24.0 behavior where handlers were context-independent. + ctx = context.WithoutCancel(ctx) + ctx, span := otel.Tracer("wealdtech.chaind.services.blocks.standard").Start(ctx, "OnBeaconChainHeadUpdated", trace.WithAttributes( attribute.Int64("slot", int64(slot)), @@ -58,7 +66,8 @@ func (s *Service) OnBeaconChainHeadUpdated( return } - log.Trace().Msg("Handling epoch transition") + handlerStart := time.Now() + log.Info().Msg("Handling epoch transition") md, err := s.getMetadata(ctx) if err != nil { @@ -67,16 +76,24 @@ func (s *Service) OnBeaconChainHeadUpdated( return } + validatorsStart := time.Now() if err := s.onEpochTransitionValidators(ctx, md, epoch); err != nil { - log.Warn().Err(err).Msg("Failed to update validators") + log.Warn().Err(err).Dur("elapsed", time.Since(validatorsStart)).Msg("Failed to update validators") + } else { + log.Info().Dur("elapsed", time.Since(validatorsStart)).Msg("Validators updated") } + + balancesStart := time.Now() + balancesGap := int64(epoch) - int64(md.LatestBalancesEpoch) if err := s.onEpochTransitionValidatorBalances(ctx, md, epoch); err != nil { - log.Warn().Err(err).Msg("Failed to update validators") + log.Warn().Err(err).Dur("elapsed", time.Since(balancesStart)).Int64("catchup_epochs", balancesGap).Msg("Failed to update balances") + } else { + log.Info().Dur("elapsed", time.Since(balancesStart)).Int64("catchup_epochs", balancesGap).Msg("Balances updated") } s.activitySem.Release(1) monitorEpochProcessed(epoch) - log.Trace().Msg("Finished handling epoch transition") + log.Info().Dur("total", time.Since(handlerStart)).Msg("Finished handling epoch transition") } func (s *Service) onEpochTransitionValidators(ctx context.Context, @@ -182,7 +199,8 @@ func (s *Service) onEpochTransitionValidatorBalancesForEpoch(ctx context.Context log := log.With().Uint64("epoch", uint64(epoch)).Logger() stateID := fmt.Sprintf("%d", s.chainTime.FirstSlotOfEpoch(epoch)) - log.Trace().Uint64("slot", uint64(s.chainTime.FirstSlotOfEpoch(epoch))).Msg("Fetching validators") + + fetchStart := time.Now() validatorsResponse, err := s.eth2Client.(eth2client.ValidatorsProvider).Validators(ctx, &api.ValidatorsOpts{ State: stateID, }) @@ -190,11 +208,13 @@ func (s *Service) onEpochTransitionValidatorBalancesForEpoch(ctx context.Context return errors.Wrap(err, "failed to obtain validators for validator balances") } validators := validatorsResponse.Data + fetchDur := time.Since(fetchStart) span.AddEvent("Obtained validators", trace.WithAttributes( attribute.Int("slot", int(s.chainTime.FirstSlotOfEpoch(epoch))), )) + writeStart := time.Now() dbCtx, cancel, err := s.chainDB.BeginTx(ctx) if err != nil { return errors.Wrap(err, "failed to begin transaction for validator balances") @@ -241,6 +261,8 @@ func (s *Service) onEpochTransitionValidatorBalancesForEpoch(ctx context.Context cancel() return errors.Wrap(err, "failed to set commit transaction for validator balances") } + writeDur := time.Since(writeStart) + log.Info().Dur("fetch", fetchDur).Dur("write", writeDur).Int("validators", len(validators)).Msg("Balance epoch processed") monitorBalancesEpochProcessed(epoch) return nil