Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,11 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {

// Skip if already processed
if height <= currentHeight || s.cache.IsHeaderSeen(headerHash) {
if event.Source == common.SourceDA {
if err := s.updateStateDAHeight(event.DaHeight); err != nil {
s.logger.Error().Err(err).Msg("failed to update DA height")
}
}
s.logger.Debug().Uint64("height", height).Msg("height already processed")
return
}
Expand Down Expand Up @@ -567,16 +572,10 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
return fmt.Errorf("failed to update height: %w", err)
}

if err := batch.UpdateState(newState); err != nil {
return fmt.Errorf("failed to update state: %w", err)
}

if err := batch.Commit(); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
if err := s.saveState(batch, newState); err != nil {
return fmt.Errorf("failed to save state: %w", err)
}

// Update in-memory state after successful commit
s.SetLastState(newState)
s.metrics.Height.Set(float64(newState.LastBlockHeight))

// Mark as seen
Expand Down Expand Up @@ -779,3 +778,32 @@ func (s *Syncer) cancelP2PWait(height uint64) {
state.cancel()
}
}

func (s *Syncer) updateStateDAHeight(daHeight uint64) error {
currentState := s.GetLastState()
if daHeight <= currentState.DAHeight {
return nil
}

currentState.DAHeight = daHeight

batch, err := s.store.NewBatch(s.ctx)
if err != nil {
return fmt.Errorf("failed to create batch: %w", err)
}

return s.saveState(batch, currentState)
}

func (s *Syncer) saveState(batch store.Batch, state types.State) error {
if err := batch.UpdateState(state); err != nil {
return fmt.Errorf("failed to update state: %w", err)
}

if err := batch.Commit(); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}

s.SetLastState(state)
return nil
}
98 changes: 98 additions & 0 deletions block/internal/syncing/syncer_da_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package syncing

import (
"context"
"testing"
"time"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/pkg/config"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/store"
testmocks "github.com/evstack/ev-node/test/mocks"
"github.com/evstack/ev-node/types"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestProcessHeightEvent_UpdatesDAHeight_WhenAlreadyProcessed(t *testing.T) {
ds := dssync.MutexWrap(datastore.NewMapDatastore())
st := store.New(ds)
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
require.NoError(t, err)

addr, pub, signer := buildSyncTestSigner(t)

cfg := config.DefaultConfig()
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using time.Now() in tests can introduce non-determinism, potentially leading to flaky tests in the future. It is a best practice to use a fixed timestamp to ensure that tests are fully reproducible.

Suggested change
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC), ProposerAddress: addr}


mockExec := testmocks.NewMockExecutor(t)
mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), uint64(1024), nil).Once()

errChan := make(chan error, 1)
s := NewSyncer(
st,
mockExec,
nil,
cm,
common.NopMetrics(),
cfg,
gen,
common.NewMockBroadcaster[*types.SignedHeader](t),
common.NewMockBroadcaster[*types.Data](t),
zerolog.Nop(),
common.DefaultBlockOptions(),
errChan,
)

require.NoError(t, s.initializeState())
s.ctx = context.Background()

// 1. Sync a block normally (simulating P2P sync or previous DA sync)
lastState := s.GetLastState()
data := makeData(gen.ChainID, 1, 0)
_, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil)

// Expect ExecuteTxs call for height 1
mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash).
Return([]byte("app1"), uint64(1024), nil).Once()

// Process the event (simulate P2P source to NOT update DA height initially if we wanted,
// but here we just want to get the block synced.
// Let's say it came from P2P, so DA height remains at genesis (0 or whatever start is).
evtP2P := common.DAHeightEvent{Header: hdr, Data: data, DaHeight: 0, Source: common.SourceP2P}
s.processHeightEvent(&evtP2P)

requireEmptyChan(t, errChan)

// Verify block is synced
h, err := st.Height(context.Background())
require.NoError(t, err)
assert.Equal(t, uint64(1), h)

// Verify current DA height is still initial (0)
currentState := s.GetLastState()
assert.Equal(t, uint64(0), currentState.DAHeight)

// 2. Now receive the SAME block from DA, but with a higher DA height
// This simulates the "processed block is higher (or equal) than what is being synced from da" scenario
// where we want to ensure DA height is updated.
daHeight := uint64(100)
evtDA := common.DAHeightEvent{Header: hdr, Data: data, DaHeight: daHeight, Source: common.SourceDA}

s.processHeightEvent(&evtDA)

// 3. Verify DA height is updated in state
updatedState := s.GetLastState()
assert.Equal(t, daHeight, updatedState.DAHeight)

// Verify it's persisted
storedState, err := st.GetState(context.Background())
require.NoError(t, err)
assert.Equal(t, daHeight, storedState.DAHeight)
}
Loading