Skip to content

Commit e3f2687

Browse files
committed
do not broadcast on sync node
1 parent 6229481 commit e3f2687

8 files changed

Lines changed: 68 additions & 84 deletions

File tree

block/components.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55
"errors"
66
"fmt"
77

8+
"github.com/celestiaorg/go-header"
89
"github.com/rs/zerolog"
910

1011
"github.com/evstack/ev-node/block/internal/cache"
11-
"github.com/evstack/ev-node/block/internal/common"
1212
"github.com/evstack/ev-node/block/internal/executing"
1313
"github.com/evstack/ev-node/block/internal/reaping"
1414
"github.com/evstack/ev-node/block/internal/submitting"
@@ -131,8 +131,8 @@ func NewSyncComponents(
131131
store store.Store,
132132
exec coreexecutor.Executor,
133133
da coreda.DA,
134-
headerBroadcaster common.Broadcaster[*types.SignedHeader],
135-
dataBroadcaster common.Broadcaster[*types.Data],
134+
headerStore header.Store[*types.SignedHeader],
135+
dataStore header.Store[*types.Data],
136136
logger zerolog.Logger,
137137
metrics *Metrics,
138138
blockOpts BlockOptions,
@@ -153,8 +153,8 @@ func NewSyncComponents(
153153
metrics,
154154
config,
155155
genesis,
156-
headerBroadcaster,
157-
dataBroadcaster,
156+
headerStore,
157+
dataStore,
158158
logger,
159159
blockOpts,
160160
errorCh,
@@ -183,6 +183,11 @@ func NewSyncComponents(
183183
}, nil
184184
}
185185

186+
// broadcaster interface for P2P broadcasting
187+
type broadcaster[T any] interface {
188+
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
189+
}
190+
186191
// NewAggregatorComponents creates components for an aggregator full node that can produce and sync blocks.
187192
// Aggregator nodes have full capabilities - they can produce blocks, sync from P2P and DA,
188193
// and submit headers/data to DA. Requires a signer for block production and DA submission.
@@ -194,8 +199,8 @@ func NewAggregatorComponents(
194199
sequencer coresequencer.Sequencer,
195200
da coreda.DA,
196201
signer signer.Signer,
197-
headerBroadcaster common.Broadcaster[*types.SignedHeader],
198-
dataBroadcaster common.Broadcaster[*types.Data],
202+
headerBroadcaster broadcaster[*types.SignedHeader],
203+
dataBroadcaster broadcaster[*types.Data],
199204
logger zerolog.Logger,
200205
metrics *Metrics,
201206
blockOpts BlockOptions,

block/internal/common/expected_interfaces.go

Lines changed: 0 additions & 13 deletions
This file was deleted.

block/internal/executing/executor.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ import (
2424
"github.com/evstack/ev-node/types"
2525
)
2626

27+
// broadcaster interface for P2P broadcasting
28+
type broadcaster[T any] interface {
29+
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
30+
}
31+
2732
// Executor handles block production, transaction processing, and state management
2833
type Executor struct {
2934
// Core components
@@ -37,8 +42,8 @@ type Executor struct {
3742
metrics *common.Metrics
3843

3944
// P2P handling
40-
headerBroadcaster common.Broadcaster[*types.SignedHeader]
41-
dataBroadcaster common.Broadcaster[*types.Data]
45+
headerBroadcaster broadcaster[*types.SignedHeader]
46+
dataBroadcaster broadcaster[*types.Data]
4247

4348
// Configuration
4449
config config.Config
@@ -76,8 +81,8 @@ func NewExecutor(
7681
metrics *common.Metrics,
7782
config config.Config,
7883
genesis genesis.Genesis,
79-
headerBroadcaster common.Broadcaster[*types.SignedHeader],
80-
dataBroadcaster common.Broadcaster[*types.Data],
84+
headerBroadcaster broadcaster[*types.SignedHeader],
85+
dataBroadcaster broadcaster[*types.Data],
8186
logger zerolog.Logger,
8287
options common.BlockOptions,
8388
errorCh chan<- error,

block/internal/syncing/syncer.go

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sync/atomic"
1010
"time"
1111

12+
"github.com/celestiaorg/go-header"
1213
"github.com/rs/zerolog"
1314
"golang.org/x/sync/errgroup"
1415

@@ -54,8 +55,8 @@ type Syncer struct {
5455
daHeight *atomic.Uint64
5556

5657
// P2P handling
57-
headerBroadcaster common.Broadcaster[*types.SignedHeader]
58-
dataBroadcaster common.Broadcaster[*types.Data]
58+
headerStore header.Store[*types.SignedHeader]
59+
dataStore header.Store[*types.Data]
5960

6061
// Channels for coordination
6162
heightInCh chan common.DAHeightEvent
@@ -83,28 +84,28 @@ func NewSyncer(
8384
metrics *common.Metrics,
8485
config config.Config,
8586
genesis genesis.Genesis,
86-
headerBroadcaster common.Broadcaster[*types.SignedHeader],
87-
dataBroadcaster common.Broadcaster[*types.Data],
87+
headerStore header.Store[*types.SignedHeader],
88+
dataStore header.Store[*types.Data],
8889
logger zerolog.Logger,
8990
options common.BlockOptions,
9091
errorCh chan<- error,
9192
) *Syncer {
9293
return &Syncer{
93-
store: store,
94-
exec: exec,
95-
da: da,
96-
cache: cache,
97-
metrics: metrics,
98-
config: config,
99-
genesis: genesis,
100-
options: options,
101-
headerBroadcaster: headerBroadcaster,
102-
dataBroadcaster: dataBroadcaster,
103-
lastState: &atomic.Pointer[types.State]{},
104-
daHeight: &atomic.Uint64{},
105-
heightInCh: make(chan common.DAHeightEvent, 10_000),
106-
errorCh: errorCh,
107-
logger: logger.With().Str("component", "syncer").Logger(),
94+
store: store,
95+
exec: exec,
96+
da: da,
97+
cache: cache,
98+
metrics: metrics,
99+
config: config,
100+
genesis: genesis,
101+
options: options,
102+
headerStore: headerStore,
103+
dataStore: dataStore,
104+
lastState: &atomic.Pointer[types.State]{},
105+
daHeight: &atomic.Uint64{},
106+
heightInCh: make(chan common.DAHeightEvent, 10_000),
107+
errorCh: errorCh,
108+
logger: logger.With().Str("component", "syncer").Logger(),
108109
}
109110
}
110111

@@ -119,7 +120,7 @@ func (s *Syncer) Start(ctx context.Context) error {
119120

120121
// Initialize handlers
121122
s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.logger)
122-
s.p2pHandler = NewP2PHandler(s.headerBroadcaster.Store(), s.dataBroadcaster.Store(), s.cache, s.genesis, s.logger)
123+
s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.cache, s.genesis, s.logger)
123124

124125
// Start main processing loop
125126
s.wg.Add(1)
@@ -340,13 +341,13 @@ func (s *Syncer) tryFetchFromP2P() {
340341
}
341342

342343
// Process headers
343-
newHeaderHeight := s.headerBroadcaster.Store().Height()
344+
newHeaderHeight := s.headerStore.Height()
344345
if newHeaderHeight > currentHeight {
345346
s.p2pHandler.ProcessHeaderRange(s.ctx, currentHeight+1, newHeaderHeight, s.heightInCh)
346347
}
347348

348349
// Process data (if not already processed by headers)
349-
newDataHeight := s.dataBroadcaster.Store().Height()
350+
newDataHeight := s.dataStore.Height()
350351
if newDataHeight != newHeaderHeight && newDataHeight > currentHeight {
351352
s.p2pHandler.ProcessDataRange(s.ctx, currentHeight+1, newDataHeight, s.heightInCh)
352353
}
@@ -406,8 +407,8 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
406407
// only save to p2p stores if the event came from DA
407408
if event.Source == common.SourceDA {
408409
g, ctx := errgroup.WithContext(s.ctx)
409-
g.Go(func() error { return s.headerBroadcaster.WriteToStoreAndBroadcast(ctx, event.Header) })
410-
g.Go(func() error { return s.dataBroadcaster.WriteToStoreAndBroadcast(ctx, event.Data) })
410+
g.Go(func() error { return s.headerStore.Append(ctx, event.Header) })
411+
g.Go(func() error { return s.dataStore.Append(ctx, event.Data) })
411412
if err := g.Wait(); err != nil {
412413
s.logger.Error().Err(err).Msg("failed to append event header and/or data to p2p store")
413414
}

block/internal/syncing/syncer_backoff_test.go

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"testing"
77
"time"
88

9-
goheader "github.com/celestiaorg/go-header"
109
"github.com/ipfs/go-datastore"
1110
dssync "github.com/ipfs/go-datastore/sync"
1211
"github.com/rs/zerolog"
@@ -25,19 +24,6 @@ import (
2524
"github.com/evstack/ev-node/types"
2625
)
2726

28-
// mockBroadcaster for testing
29-
type mockBroadcaster[T goheader.Header[T]] struct {
30-
store goheader.Store[T]
31-
}
32-
33-
func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, payload T) error {
34-
return nil
35-
}
36-
37-
func (m *mockBroadcaster[T]) Store() goheader.Store[T] {
38-
return m.store
39-
}
40-
4127
// TestSyncer_BackoffOnDAError verifies that the syncer implements proper backoff
4228
// behavior when encountering different types of DA layer errors.
4329
func TestSyncer_BackoffOnDAError(t *testing.T) {
@@ -90,11 +76,11 @@ func TestSyncer_BackoffOnDAError(t *testing.T) {
9076

9177
headerStore := mocks.NewMockStore[*types.SignedHeader](t)
9278
headerStore.On("Height").Return(uint64(0)).Maybe()
93-
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}
79+
syncer.headerStore = headerStore
9480

9581
dataStore := mocks.NewMockStore[*types.Data](t)
9682
dataStore.On("Height").Return(uint64(0)).Maybe()
97-
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}
83+
syncer.dataStore = dataStore
9884

9985
var callTimes []time.Time
10086
callCount := 0
@@ -181,11 +167,11 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) {
181167

182168
headerStore := mocks.NewMockStore[*types.SignedHeader](t)
183169
headerStore.On("Height").Return(uint64(0)).Maybe()
184-
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}
170+
syncer.headerStore = headerStore
185171

186172
dataStore := mocks.NewMockStore[*types.Data](t)
187173
dataStore.On("Height").Return(uint64(0)).Maybe()
188-
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}
174+
syncer.dataStore = dataStore
189175

190176
var callTimes []time.Time
191177

@@ -267,11 +253,11 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) {
267253

268254
headerStore := mocks.NewMockStore[*types.SignedHeader](t)
269255
headerStore.On("Height").Return(uint64(0)).Maybe()
270-
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}
256+
syncer.headerStore = headerStore
271257

272258
dataStore := mocks.NewMockStore[*types.Data](t)
273259
dataStore.On("Height").Return(uint64(0)).Maybe()
274-
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}
260+
syncer.dataStore = dataStore
275261

276262
var callTimes []time.Time
277263

@@ -349,8 +335,8 @@ func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer {
349335
common.NopMetrics(),
350336
cfg,
351337
gen,
352-
&mockBroadcaster[*types.SignedHeader]{},
353-
&mockBroadcaster[*types.Data]{},
338+
&mocks.MockStore[*types.SignedHeader]{},
339+
&mocks.MockStore[*types.Data]{},
354340
zerolog.Nop(),
355341
common.DefaultBlockOptions(),
356342
make(chan error, 1),

block/internal/syncing/syncer_benchmark_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,9 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay
152152
s.p2pHandler = newMockp2pHandler(b) // not used directly in this benchmark path
153153
headerP2PStore := mocks.NewMockStore[*types.SignedHeader](b)
154154
headerP2PStore.On("Height").Return(uint64(0)).Maybe()
155-
s.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerP2PStore}
155+
s.headerStore = headerP2PStore
156156
dataP2PStore := mocks.NewMockStore[*types.Data](b)
157157
dataP2PStore.On("Height").Return(uint64(0)).Maybe()
158-
s.dataBroadcaster = &mockBroadcaster[*types.Data]{dataP2PStore}
158+
s.dataStore = dataP2PStore
159159
return &benchFixture{s: s, st: st, cm: cm, cancel: cancel}
160160
}

block/internal/syncing/syncer_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) {
108108
common.NopMetrics(),
109109
cfg,
110110
gen,
111-
&mockBroadcaster[*types.SignedHeader]{},
112-
&mockBroadcaster[*types.Data]{},
111+
&mocks.MockStore[*types.SignedHeader]{},
112+
&mocks.MockStore[*types.Data]{},
113113
zerolog.Nop(),
114114
common.DefaultBlockOptions(),
115115
make(chan error, 1),
@@ -156,8 +156,8 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) {
156156
common.NopMetrics(),
157157
cfg,
158158
gen,
159-
&mockBroadcaster[*types.SignedHeader]{},
160-
&mockBroadcaster[*types.Data]{},
159+
&mocks.MockStore[*types.SignedHeader]{},
160+
&mocks.MockStore[*types.Data]{},
161161
zerolog.Nop(),
162162
common.DefaultBlockOptions(),
163163
make(chan error, 1),
@@ -206,8 +206,8 @@ func TestSequentialBlockSync(t *testing.T) {
206206
common.NopMetrics(),
207207
cfg,
208208
gen,
209-
&mockBroadcaster[*types.SignedHeader]{},
210-
&mockBroadcaster[*types.Data]{},
209+
&mocks.MockStore[*types.SignedHeader]{},
210+
&mocks.MockStore[*types.Data]{},
211211
zerolog.Nop(),
212212
common.DefaultBlockOptions(),
213213
make(chan error, 1),
@@ -337,8 +337,8 @@ func TestSyncLoopPersistState(t *testing.T) {
337337
common.NopMetrics(),
338338
cfg,
339339
gen,
340-
&mockBroadcaster[*types.SignedHeader]{mockP2PHeaderStore},
341-
&mockBroadcaster[*types.Data]{mockP2PDataStore},
340+
mockP2PHeaderStore,
341+
mockP2PDataStore,
342342
zerolog.Nop(),
343343
common.DefaultBlockOptions(),
344344
make(chan error, 1),
@@ -416,8 +416,8 @@ func TestSyncLoopPersistState(t *testing.T) {
416416
common.NopMetrics(),
417417
cfg,
418418
gen,
419-
&mockBroadcaster[*types.SignedHeader]{mockP2PHeaderStore},
420-
&mockBroadcaster[*types.Data]{mockP2PDataStore},
419+
mockP2PHeaderStore,
420+
mockP2PDataStore,
421421
zerolog.Nop(),
422422
common.DefaultBlockOptions(),
423423
make(chan error, 1),

node/full.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ func newFullNode(
120120
rktStore,
121121
exec,
122122
da,
123-
headerSyncService,
124-
dataSyncService,
123+
headerSyncService.Store(),
124+
dataSyncService.Store(),
125125
logger,
126126
blockMetrics,
127127
nodeOpts.BlockOptions,

0 commit comments

Comments
 (0)