Skip to content

Commit dcc43ba

Browse files
committed
reduce diff
1 parent b36b791 commit dcc43ba

9 files changed

Lines changed: 347 additions & 41 deletions

File tree

block/components.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/evstack/ev-node/pkg/telemetry"
2525
)
2626

27+
// Components represents the block-related components
2728
type Components struct {
2829
Executor *executing.Executor
2930
Pruner *pruner.Pruner
@@ -32,12 +33,16 @@ type Components struct {
3233
Submitter *submitting.Submitter
3334
Cache cache.Manager
3435

36+
// Error channel for critical failures that should stop the node
3537
errorCh chan error
3638
}
3739

40+
// Start starts all components and monitors for critical errors.
41+
// It is blocking and returns when the context is cancelled or an error occurs
3842
func (bc *Components) Start(ctx context.Context) error {
3943
ctxWithCancel, cancel := context.WithCancel(ctx)
4044

45+
// error monitoring goroutine
4146
criticalErrCh := make(chan error, 1)
4247
go func() {
4348
select {
@@ -75,8 +80,10 @@ func (bc *Components) Start(ctx context.Context) error {
7580
}
7681
}
7782

83+
// wait for context cancellation (either from parent or critical error)
7884
<-ctxWithCancel.Done()
7985

86+
// if we got here due to a critical error, return that error
8087
select {
8188
case err := <-criticalErrCh:
8289
return fmt.Errorf("node stopped due to critical execution client failure: %w", err)
@@ -85,6 +92,7 @@ func (bc *Components) Start(ctx context.Context) error {
8592
}
8693
}
8794

95+
// Stop stops all components
8896
func (bc *Components) Stop() error {
8997
var errs error
9098
if bc.Executor != nil {
@@ -121,6 +129,9 @@ func (bc *Components) Stop() error {
121129
return errs
122130
}
123131

132+
// NewSyncComponents creates components for a non-aggregator full node that can only sync blocks.
133+
// Non-aggregator full nodes can sync from DA but cannot produce blocks or submit to DA.
134+
// They have more sync capabilities than light nodes but no block production. No signer required.
124135
func NewSyncComponents(
125136
config config.Config,
126137
genesis genesis.Genesis,
@@ -137,6 +148,7 @@ func NewSyncComponents(
137148
return nil, fmt.Errorf("failed to create cache manager: %w", err)
138149
}
139150

151+
// error channel for critical failures
140152
errorCh := make(chan error, 1)
141153

142154
syncer := syncing.NewSyncer(
@@ -163,6 +175,7 @@ func NewSyncComponents(
163175
}
164176
prunerObj := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)
165177

178+
// Create submitter for sync nodes (no signer, only DA inclusion processing)
166179
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
167180
if config.Instrumentation.IsTracingEnabled() {
168181
daSubmitter = submitting.WithTracingDASubmitter(daSubmitter)
@@ -175,8 +188,8 @@ func NewSyncComponents(
175188
config,
176189
genesis,
177190
daSubmitter,
178-
nil,
179-
nil,
191+
nil, // No sequencer for sync nodes
192+
nil, // No signer for sync nodes
180193
logger,
181194
errorCh,
182195
)
@@ -190,6 +203,9 @@ func NewSyncComponents(
190203
}, nil
191204
}
192205

206+
// newAggregatorComponents creates components for an aggregator full node that can produce and sync blocks.
207+
// Aggregator nodes have full capabilities - they can produce blocks, sync from DA,
208+
// and submit headers/data to DA. Requires a signer for block production and DA submission.
193209
func newAggregatorComponents(
194210
config config.Config,
195211
genesis genesis.Genesis,
@@ -208,8 +224,10 @@ func newAggregatorComponents(
208224
return nil, fmt.Errorf("failed to create cache manager: %w", err)
209225
}
210226

227+
// error channel for critical failures
211228
errorCh := make(chan error, 1)
212229

230+
// wrap sequencer with tracing if enabled
213231
if config.Instrumentation.IsTracingEnabled() {
214232
sequencer = telemetry.WithTracingSequencer(sequencer)
215233
}
@@ -255,7 +273,7 @@ func newAggregatorComponents(
255273
return nil, fmt.Errorf("failed to create reaper: %w", err)
256274
}
257275

258-
if config.Node.BasedSequencer {
276+
if config.Node.BasedSequencer { // no submissions needed for based sequencer
259277
return &Components{
260278
Executor: executor,
261279
Pruner: prunerObj,
@@ -278,7 +296,7 @@ func newAggregatorComponents(
278296
genesis,
279297
daSubmitter,
280298
sequencer,
281-
signer,
299+
signer, // Signer for aggregator nodes to submit to DA
282300
logger,
283301
errorCh,
284302
)
@@ -293,6 +311,13 @@ func newAggregatorComponents(
293311
}, nil
294312
}
295313

314+
// NewAggregatorWithCatchupComponents creates aggregator components that include a Syncer
315+
// for DA catchup before block production begins.
316+
//
317+
// The caller should:
318+
// 1. Start the Syncer and wait for DA head catchup
319+
// 2. Stop the Syncer and set Components.Syncer = nil
320+
// 3. Call Components.Start() — which will start the Executor and other components
296321
func NewAggregatorWithCatchupComponents(
297322
config config.Config,
298323
genesis genesis.Genesis,
@@ -314,6 +339,7 @@ func NewAggregatorWithCatchupComponents(
314339
return nil, err
315340
}
316341

342+
// Create a catchup syncer that shares the same cache manager
317343
catchupErrCh := make(chan error, 1)
318344
catchupSyncer := syncing.NewSyncer(
319345
store,

block/internal/common/event.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,40 @@ import (
66
"github.com/evstack/ev-node/types"
77
)
88

9+
// EventSource represents the origin of a block event
910
type EventSource string
1011

1112
const (
12-
SourceDA EventSource = "da"
13+
// SourceDA indicates the event came from the DA layer
14+
SourceDA EventSource = "da"
15+
// SourceRaft indicates the event came from Raft consensus recovery
1316
SourceRaft EventSource = "raft"
1417
)
1518

19+
// AllEventSources returns all possible event sources.
1620
func AllEventSources() []EventSource {
1721
return []EventSource{SourceDA, SourceRaft}
1822
}
1923

24+
// DAHeightEvent represents a DA event for caching
2025
type DAHeightEvent struct {
21-
Header *types.SignedHeader
22-
Data *types.Data
26+
Header *types.SignedHeader
27+
Data *types.Data
28+
// DaHeight corresponds to the highest DA included height between the Header and Data.
2329
DaHeight uint64
24-
Source EventSource
30+
// Source indicates where this event originated from (DA or Raft)
31+
Source EventSource
2532
}
2633

34+
// EventSink receives parsed DA events with backpressure support.
2735
type EventSink interface {
2836
PipeEvent(ctx context.Context, event DAHeightEvent) error
2937
}
3038

39+
// EventSinkFunc adapts a plain function to the EventSink interface.
40+
// Useful in tests:
41+
//
42+
// sink := common.EventSinkFunc(func(ctx context.Context, ev common.DAHeightEvent) error { return nil })
3143
type EventSinkFunc func(ctx context.Context, event DAHeightEvent) error
3244

3345
func (f EventSinkFunc) PipeEvent(ctx context.Context, event DAHeightEvent) error {

0 commit comments

Comments
 (0)