Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
dev:
- added `blocks.attestations.enable` configuration flag (default: true) to control whether attestations contained in blocks should be saved
- detach validator handler from SSE event-stream context so a stream reconnect does not cancel an in-flight balance write
- added fetch/write duration logs to the validator balances handler

0.10.2:
- added `blocks.blobs.enable` configuration flag (default: true) to control whether blobs for blocks should be fetched and saved

Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Data gathers four pieces of information from the beacon node, broken down by the
- **Validators** The validators module provides information on the current statue of validators. It can also obtain information on the validators' balances and effective balances at a given epoch;
- **Blocks** The blocks module provides information on blocks proposed for each slot. This includes:
- the block structure
- attestations
- attestations (can be disabled with `blocks.attestations.enable: false`)
- proposer slashings
- attester slashings
- deposits
Expand Down Expand Up @@ -206,6 +206,10 @@ blocks:
# blobs.enable will fetch and save blobs for block
blobs:
enable: true
# attestations.enable will save attestations contained in blocks.
# Disable to reduce database size during backfill when attestations are not needed.
attestations:
enable: true
# validators contains configuration for obtaining validator-related information.
validators:
enable: true
Expand Down
4 changes: 4 additions & 0 deletions chaind.config.docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ blocks:
# blobs.enable will fetch and save blobs for blocks
blobs:
enable: true
# attestations.enable will save attestations contained in blocks.
# Disable to reduce database size during backfill when attestations are not needed.
attestations:
enable: true
# validators contains configuration for obtaining validator-related information.
validators:
enable: true
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func fetchConfig() error {
pflag.Int32("blocks.start-slot", -1, "Slot from which to start fetching blocks")
pflag.Bool("blocks.refetch", false, "Refetch all blocks even if they are already in the database")
pflag.Bool("blocks.blobs.enable", true, "Enable saving of blobs for block-related information")
pflag.Bool("blocks.attestations.enable", true, "Enable saving of attestations for block-related information")
pflag.Bool("finalizer.enable", true, "Enable additional information on receipt of finality checkpoint")
pflag.Bool("summarizer.enable", true, "Enable summary information")
pflag.Bool("summarizer.epochs.enable", true, "Enable summary information for epochs")
Expand Down Expand Up @@ -510,6 +511,7 @@ func startBlocks(
standardblocks.WithStartSlot(viper.GetInt64("blocks.start-slot")),
standardblocks.WithRefetch(viper.GetBool("blocks.refetch")),
standardblocks.WithBlobsSaving(viper.GetBool("blocks.blobs.enable")),
standardblocks.WithAttestationsSaving(viper.GetBool("blocks.attestations.enable")),
standardblocks.WithActivitySem(activitySem),
)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions services/blocks/standard/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ func (s *Service) updateAttestationsForBlock(ctx context.Context,
blockRoot phase0.Root,
attestations []*phase0.Attestation,
) error {
if !s.attestationsSaving {
return nil
}

ctx, span := otel.Tracer("wealdtech.chaind.services.blocks.standard").Start(ctx, "updateAttestationsForBlock")
defer span.End()

Expand Down Expand Up @@ -558,6 +562,10 @@ func (s *Service) updateElectraAttestationsForBlock(ctx context.Context,
blockRoot phase0.Root,
attestations []*electra.Attestation,
) error {
if !s.attestationsSaving {
return nil
}

ctx, span := otel.Tracer("wealdtech.chaind.services.blocks.standard").Start(ctx, "updateElectraAttestationsForBlock")
defer span.End()

Expand Down
19 changes: 14 additions & 5 deletions services/blocks/standard/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ type parameters struct {
chainDB chaindb.Service
chainTime chaintime.Service
startSlot int64
refetch bool
blobsSaving bool
activitySem *semaphore.Weighted
refetch bool
blobsSaving bool
attestationsSaving bool
activitySem *semaphore.Weighted
}

// Parameter is the interface for service parameters.
Expand Down Expand Up @@ -103,6 +104,13 @@ func WithBlobsSaving(blobsSaving bool) Parameter {
})
}

// WithAttestationsSaving sets the attestationsSaving flag for this module.
func WithAttestationsSaving(attestationsSaving bool) Parameter {
return parameterFunc(func(p *parameters) {
p.attestationsSaving = attestationsSaving
})
}

// WithActivitySem sets the activity semaphore for this module.
func WithActivitySem(sem *semaphore.Weighted) Parameter {
return parameterFunc(func(p *parameters) {
Expand All @@ -113,8 +121,9 @@ func WithActivitySem(sem *semaphore.Weighted) Parameter {
// parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct.
func parseAndCheckParameters(params ...Parameter) (*parameters, error) {
parameters := parameters{
logLevel: zerolog.GlobalLevel(),
startSlot: -1,
logLevel: zerolog.GlobalLevel(),
startSlot: -1,
attestationsSaving: true,
}
for _, p := range params {
if params != nil {
Expand Down
3 changes: 3 additions & 0 deletions services/blocks/standard/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Service struct {
chainTime chaintime.Service
refetch bool
blobsSaving bool
attestationsSaving bool
lastHandledBlockRoot phase0.Root
activitySem *semaphore.Weighted
syncCommittees map[uint64]*chaindb.SyncCommittee
Expand Down Expand Up @@ -154,12 +155,14 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) {
chainTime: parameters.chainTime,
refetch: parameters.refetch,
blobsSaving: parameters.blobsSaving,
attestationsSaving: parameters.attestationsSaving,
activitySem: parameters.activitySem,
syncCommittees: make(map[uint64]*chaindb.SyncCommittee),
}

log.Trace().
Bool("blobsSaving", parameters.blobsSaving).
Bool("attestationsSaving", parameters.attestationsSaving).
Bool("refetch", parameters.refetch).
Int64("startSlot", parameters.startSlot).
Msg("Blocks Service initialized")
Expand Down
32 changes: 27 additions & 5 deletions services/validators/standard/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"fmt"
"time"

eth2client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/api"
Expand All @@ -37,6 +38,13 @@ func (s *Service) OnBeaconChainHeadUpdated(
// skipcq: RVV-A0005
epochTransition bool,
) {
// Detach from SSE event stream context. In go-eth2-client v0.27.1+,
// the head handler receives the SSE connection context. If the SSE stream
// disconnects (e.g. Cloudflare timeout), this context gets canceled and
// kills any in-flight state downloads for validator balances.
// This restores the v0.24.0 behavior where handlers were context-independent.
ctx = context.WithoutCancel(ctx)

ctx, span := otel.Tracer("wealdtech.chaind.services.blocks.standard").Start(ctx, "OnBeaconChainHeadUpdated",
trace.WithAttributes(
attribute.Int64("slot", int64(slot)),
Expand All @@ -58,7 +66,8 @@ func (s *Service) OnBeaconChainHeadUpdated(
return
}

log.Trace().Msg("Handling epoch transition")
handlerStart := time.Now()
log.Info().Msg("Handling epoch transition")

md, err := s.getMetadata(ctx)
if err != nil {
Expand All @@ -67,16 +76,24 @@ func (s *Service) OnBeaconChainHeadUpdated(
return
}

validatorsStart := time.Now()
if err := s.onEpochTransitionValidators(ctx, md, epoch); err != nil {
log.Warn().Err(err).Msg("Failed to update validators")
log.Warn().Err(err).Dur("elapsed", time.Since(validatorsStart)).Msg("Failed to update validators")
} else {
log.Info().Dur("elapsed", time.Since(validatorsStart)).Msg("Validators updated")
}

balancesStart := time.Now()
balancesGap := int64(epoch) - int64(md.LatestBalancesEpoch)
if err := s.onEpochTransitionValidatorBalances(ctx, md, epoch); err != nil {
log.Warn().Err(err).Msg("Failed to update validators")
log.Warn().Err(err).Dur("elapsed", time.Since(balancesStart)).Int64("catchup_epochs", balancesGap).Msg("Failed to update balances")
} else {
log.Info().Dur("elapsed", time.Since(balancesStart)).Int64("catchup_epochs", balancesGap).Msg("Balances updated")
}
s.activitySem.Release(1)

monitorEpochProcessed(epoch)
log.Trace().Msg("Finished handling epoch transition")
log.Info().Dur("total", time.Since(handlerStart)).Msg("Finished handling epoch transition")
}

func (s *Service) onEpochTransitionValidators(ctx context.Context,
Expand Down Expand Up @@ -182,19 +199,22 @@ func (s *Service) onEpochTransitionValidatorBalancesForEpoch(ctx context.Context

log := log.With().Uint64("epoch", uint64(epoch)).Logger()
stateID := fmt.Sprintf("%d", s.chainTime.FirstSlotOfEpoch(epoch))
log.Trace().Uint64("slot", uint64(s.chainTime.FirstSlotOfEpoch(epoch))).Msg("Fetching validators")

fetchStart := time.Now()
validatorsResponse, err := s.eth2Client.(eth2client.ValidatorsProvider).Validators(ctx, &api.ValidatorsOpts{
State: stateID,
})
if err != nil {
return errors.Wrap(err, "failed to obtain validators for validator balances")
}
validators := validatorsResponse.Data
fetchDur := time.Since(fetchStart)

span.AddEvent("Obtained validators", trace.WithAttributes(
attribute.Int("slot", int(s.chainTime.FirstSlotOfEpoch(epoch))),
))

writeStart := time.Now()
dbCtx, cancel, err := s.chainDB.BeginTx(ctx)
if err != nil {
return errors.Wrap(err, "failed to begin transaction for validator balances")
Expand Down Expand Up @@ -241,6 +261,8 @@ func (s *Service) onEpochTransitionValidatorBalancesForEpoch(ctx context.Context
cancel()
return errors.Wrap(err, "failed to set commit transaction for validator balances")
}
writeDur := time.Since(writeStart)
log.Info().Dur("fetch", fetchDur).Dur("write", writeDur).Int("validators", len(validators)).Msg("Balance epoch processed")
monitorBalancesEpochProcessed(epoch)

return nil
Expand Down