Skip to content

Commit 5234506

Browse files
authored
fix(fiber): bound runner memory under sustained Fibre load (#3306)
* fix(solo,reaping): bound sequencer queue to prevent ingest-side OOM Under sustained ingest above the block-production drain rate, SoloSequencer.queue grew monotonically. A 32-vCPU loadgen pushing >100 MB/s into a runner whose executor drains ~100 MB/s per block filled the queue at ~150 MB/s of net-positive growth — heap profiles showed 24 GB of retained io.ReadAll bytes in the queue within ~30 s, then anon-rss:63GB OOM-kill at the box's 64 GiB ceiling. Reproducible twice with identical signature. Two changes, one feature: - SoloSequencer.SetMaxQueueBytes(n) caps the queue's total retained tx bytes. SubmitBatchTxs uses all-or-nothing admission against the cap: if the incoming batch would push us over, the whole batch is rejected with ErrQueueFull and the queue keeps its current contents untouched. Partial admission would force the caller to track which prefix succeeded and only re-feed the suffix on retry; the reaper currently doesn't do that, so the whole-batch rule lets the reaper just retry the same batch later when the queue has drained. queueBytes is decremented on drain (queue := nil) and re-counted for postponed txs that the executor's FilterTxs returns to the queue. Zero cap = the legacy unbounded path, preserved for tests and small deployments. - The reaper bridging executor mempool → sequencer matches ErrQueueFull via errors.Is and treats it as transient backpressure: marks the rejected hashes as "seen" so the next reaper tick doesn't re-hash + re-submit the same already- rejected txs forever, logs a warn line with the dropped count, and continues running. Without this match every queue-full event would tear the daemon down via the existing fatal-on- submit-error path. Loadgen sees the backpressure indirectly: with the sequencer queue full, the executor's txChan stops draining, /tx blocks on its bounded channel send, and txsim observes 5xx / timeouts — cleanly applied at the application layer instead of via the kernel OOM-killer. * fix(evnode-fibre): enforce maxBytes in inMemExecutor.FilterTxs The stub executor used by the runner returned FilterOK for every transaction unconditionally, ignoring the maxBytes budget plumbed through SoloSequencer.GetNextBatch. Under sustained txsim load (~50 MiB/s, 8 concurrent senders) the mempool would accumulate ~50K txs while a 100 MiB upload was in flight; on the next batch the sequencer drained ALL of them into one block (~369 MiB raw), the submitter saw a single item exceeding the per-blob cap, and halted the node with `single item exceeds DA blob size limit`. Walk the input txs in arrival order, accumulate sizes against maxBytes, and return FilterPostpone past the budget so the sequencer puts the overflow back on its queue. Verified live: blocks now cap at ~10K txs / ~100 MiB and evnode sustains 58.77 MB/s DA upload throughput through a 5-min txsim run with zero crashes (was 0 → crash within 30 s before this fix). * fix(evnode-fibre): wire sequencer queue cap + lift ingest queue caps Two runner-side changes paired with the SoloSequencer bound: - After constructing the SoloSequencer, call SetMaxQueueBytes with 10× the per-block tx budget (= 1 GiB at the current 100 MiB MaxBlobSize). 10× is the sweet spot: large enough that a short burst above steady-state ingest doesn't trigger backpressure (we want to absorb that), small enough that the worst-case retained bytes fit comfortably under the box's RAM budget alongside the pending cache + DA in-flight buffers. - Lift the inMemExecutor's hardcoded ingest caps. txChan and maxBlockTxs were sized at 500 (5 MB / 5K txs per reaper poll) back when those were the only memory bound on the runner. With the SetMaxQueueBytes cap and the FilterTxs-enforced per-block budget now actually doing the bounding, the ingest queue can hold a full 100 MiB block-worth of txs (10K slots at 10 KB) without burdening memory — and a single reaper poll can drain that whole batch in one GetTxs call instead of needing 20× cycles. This was the binding constraint at ~5,000 tx/s = 50 MB/s in earlier runs. * fix(config): tighten Fiber pending cap to 10 to bound submitter memory ApplyFiberDefaults set MaxPendingHeadersAndData=50, but each pending data item under Fiber is up to MaxBlobSize (~100 MiB raw). With 3-FSP fan-out and per-attempt retry buffers in flight, 50 items × 3 × retries crossed 64 GiB on c6in.8xlarge under sustained txsim load and the kernel OOM-killed evnode 30 s into the run. 10 keeps the in-flight footprint bounded while still letting healthy uploads pipeline against the actual Fibre RPC latency. Verified by heap profiling: pending pause at ~ 10 × 100 MiB plus fan-out keeps RSS below ~10 GiB, evnode runs indefinitely.
1 parent 513ce9b commit 5234506

4 files changed

Lines changed: 125 additions & 20 deletions

File tree

block/internal/reaping/reaper.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
coreexecutor "github.com/evstack/ev-node/core/execution"
1717
coresequencer "github.com/evstack/ev-node/core/sequencer"
1818
"github.com/evstack/ev-node/pkg/genesis"
19+
"github.com/evstack/ev-node/pkg/sequencers/solo"
1920
)
2021

2122
const (
@@ -193,6 +194,17 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) {
193194
Id: []byte(r.chainID),
194195
Batch: &coresequencer.Batch{Transactions: newTxs},
195196
})
197+
if errors.Is(err, solo.ErrQueueFull) {
198+
// Sequencer queue is full — backpressure signal. Mark the
199+
// batch as "seen" so we don't waste cycles re-hashing the
200+
// same dropped txs every reaper tick, and surface the drop
201+
// as a warning rather than tearing down the daemon. The
202+
// loadgen sees lower acceptance via /tx flow control once
203+
// the executor's own mempool fills up.
204+
r.cache.SetTxsSeen(newHashes)
205+
r.logger.Warn().Int("dropped", len(newTxs)).Msg("sequencer queue full, dropping txs (backpressure)")
206+
break
207+
}
196208
if err != nil {
197209
return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err)
198210
}

pkg/config/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,12 @@ func (c *Config) ApplyFiberDefaults() {
367367
}
368368

369369
c.DA.BlockTime = DurationWrapper{Duration: 1 * time.Second}
370-
c.Node.MaxPendingHeadersAndData = 50
370+
// Tighter pending cap (was 50). At 50, a Fibre upload stall lets the
371+
// submitter accumulate 50 × ~32 MiB blob copies + their per-validator
372+
// retry buffers; under load that exceeded c6in.8xlarge's 64 GiB and
373+
// OOM-killed evnode at 63.8 GiB. 10 keeps the in-flight footprint
374+
// bounded while still letting healthy uploads pipeline.
375+
c.Node.MaxPendingHeadersAndData = 10
371376
}
372377

373378
// GetNamespace returns the namespace for header submissions.

pkg/sequencers/solo/sequencer.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,15 @@ import (
1414
coresequencer "github.com/evstack/ev-node/core/sequencer"
1515
)
1616

17-
var ErrInvalidID = errors.New("invalid chain id")
17+
var (
18+
ErrInvalidID = errors.New("invalid chain id")
19+
// ErrQueueFull is returned from SubmitBatchTxs when the in-memory
20+
// queue is at its byte cap (see SetMaxQueueBytes). Callers should
21+
// treat this as transient backpressure (drop or retry); the
22+
// reaper bridging executor mempool → sequencer matches it via
23+
// errors.Is and downgrades to a warning.
24+
ErrQueueFull = errors.New("sequencer queue full")
25+
)
1826

1927
var (
2028
emptyBatch = &coresequencer.Batch{}
@@ -27,15 +35,26 @@ var _ coresequencer.Sequencer = (*SoloSequencer)(nil)
2735
// SoloSequencer is a single-leader sequencer without forced inclusion
2836
// support. It maintains a simple in-memory queue of mempool transactions and
2937
// produces batches on demand.
38+
//
39+
// The queue can be bounded in bytes via SetMaxQueueBytes. A bound is
40+
// strongly recommended in any high-throughput configuration: under
41+
// sustained ingest above the block-production drain rate the queue
42+
// otherwise grows monotonically until OOM. With a bound set,
43+
// SubmitBatchTxs admits only as many incoming txs as fit and returns
44+
// ErrQueueFull if the bound rejected at least one tx, so callers can
45+
// surface backpressure (e.g. via HTTP 503) instead of silently
46+
// retaining bytes.
3047
type SoloSequencer struct {
3148
logger zerolog.Logger
3249
id []byte
3350
executor execution.Executor
3451

3552
daHeight atomic.Uint64
3653

37-
mu sync.Mutex
38-
queue [][]byte
54+
mu sync.Mutex
55+
queue [][]byte
56+
queueBytes uint64
57+
maxQueueBytes uint64 // 0 = unbounded (legacy default)
3958
}
4059

4160
func NewSoloSequencer(
@@ -51,6 +70,16 @@ func NewSoloSequencer(
5170
}
5271
}
5372

73+
// SetMaxQueueBytes sets a soft cap on the sequencer's in-memory tx
74+
// queue. SubmitBatchTxs admits txs in arrival order while the cap has
75+
// room and returns ErrQueueFull as soon as one is rejected. A zero value
76+
// disables the cap. Intended to be called once at startup.
77+
func (s *SoloSequencer) SetMaxQueueBytes(n uint64) {
78+
s.mu.Lock()
79+
defer s.mu.Unlock()
80+
s.maxQueueBytes = n
81+
}
82+
5483
func (s *SoloSequencer) isValid(id []byte) bool {
5584
return bytes.Equal(s.id, id)
5685
}
@@ -67,7 +96,30 @@ func (s *SoloSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Su
6796
s.mu.Lock()
6897
defer s.mu.Unlock()
6998

99+
if s.maxQueueBytes == 0 {
100+
// Unbounded path (legacy). Suitable for tests and small
101+
// deployments; in production use SetMaxQueueBytes.
102+
s.queue = append(s.queue, req.Batch.Transactions...)
103+
return submitBatchResp, nil
104+
}
105+
106+
// All-or-nothing: if the whole incoming batch doesn't fit, reject
107+
// it untouched. Partial admission would force the caller (e.g.
108+
// the reaper bridging executor mempool → sequencer) to reason
109+
// about which prefix was admitted and re-feed only the suffix on
110+
// retry, which it doesn't currently do — leading to duplicate-tx
111+
// resubmission on each retry. Rejecting the whole batch lets the
112+
// reaper just retry with the same batch later when the queue has
113+
// drained.
114+
var batchBytes uint64
115+
for _, tx := range req.Batch.Transactions {
116+
batchBytes += uint64(len(tx))
117+
}
118+
if s.queueBytes+batchBytes > s.maxQueueBytes {
119+
return submitBatchResp, ErrQueueFull
120+
}
70121
s.queue = append(s.queue, req.Batch.Transactions...)
122+
s.queueBytes += batchBytes
71123
return submitBatchResp, nil
72124
}
73125

@@ -79,6 +131,7 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN
79131
s.mu.Lock()
80132
txs := s.queue
81133
s.queue = nil
134+
s.queueBytes = 0
82135
s.mu.Unlock()
83136

84137
if len(txs) == 0 {
@@ -122,6 +175,14 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN
122175
if len(postponedTxs) > 0 {
123176
s.mu.Lock()
124177
s.queue = append(postponedTxs, s.queue...)
178+
// Postponed txs were already in the queue's byte count when
179+
// SubmitBatchTxs admitted them. We zeroed queueBytes on drain
180+
// above, so re-queuing requires re-counting whatever survived.
181+
var bytes uint64
182+
for _, tx := range postponedTxs {
183+
bytes += uint64(len(tx))
184+
}
185+
s.queueBytes += bytes
125186
s.mu.Unlock()
126187
}
127188

tools/celestia-node-fiber/cmd/evnode-fibre/main.go

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,19 @@ func run(cli cliFlags) error {
327327

328328
executor := newInMemExecutor()
329329
sequencer := solo.NewSoloSequencer(logger, []byte(genesis.ChainID), executor)
330+
// Cap the sequencer's in-memory queue at 10× the per-block tx
331+
// budget. Above this, SubmitBatchTxs returns ErrQueueFull and the
332+
// runner's reaper-bridge / tx-ingress applies backpressure (txs
333+
// stay in the executor's txChan until the sequencer drains, and
334+
// the chan's bound 503's /tx). Without this cap a fast loadgen
335+
// (32 vCPU pushing >100 MB/s) outruns the 1 block/s drain and
336+
// the queue grows monotonically — observed pre-fix as 24 GB of
337+
// retained io.ReadAll bytes in heap snapshots before the daemon
338+
// hit the 64 GiB box ceiling and OOM-killed.
339+
// Sized at 10× the per-block tx budget (matches SetMaxBlobSize
340+
// above; both anchor at the per-blob Fibre cap).
341+
const seqQueueBytes = 10 * 100 * 1024 * 1024 // 1 GiB
342+
sequencer.SetMaxQueueBytes(seqQueueBytes)
330343
daClient := block.NewFiberDAClient(adapter, cfg, logger, 0)
331344
p2pClient, err := p2p.NewClient(cfg.P2P, nodeKey.PrivKey, datastore.NewMapDatastore(), genesis.ChainID, logger, nil)
332345
if err != nil {
@@ -479,23 +492,23 @@ type inMemExecutor struct {
479492
totalTxs atomic.Uint64
480493
}
481494

482-
// txChan capacity caps in-flight memory: at 10 KB tx and 500 slots
483-
// we hold ≤ 5 MB queued before /tx blocks the ingress goroutine —
484-
// which is exactly the backpressure we want against a hot loadgen.
485-
// Reaper drains every 100 ms into the solo sequencer, which then
486-
// accumulates batches between block-production ticks; without a tight
487-
// cap a single block can balloon past the 120 MiB DA blob limit and
488-
// the rest of the daemon's per-block allocations push the box past
489-
// its RAM budget within seconds.
495+
// txChan capacity bounds the HTTP /tx ingest queue. Sized at 10K
496+
// slots (~100 MiB at 10 KB tx-size) so a 100 ms reaper cycle can
497+
// absorb a full max-size block's worth of txs without /tx blocking
498+
// the loadgen. Earlier we used 500 slots (~5 MiB) which forced
499+
// backpressure at ~5,000 tx/s — that turned txsim into the limiting
500+
// factor at ~22 MB/s rather than DA upload. With the per-block
501+
// FilterTxs cap (executor.go:RetrieveBatch via DefaultMaxBlobSize=
502+
// 100 MiB) and the submitter chunker now enforcing the actual blob
503+
// budget, the executor doesn't need an extra ingest-side cap.
490504
//
491-
// maxBlockTxs caps GetTxs's per-call return so reaper-cycle batches
492-
// are bounded too. With 500 ≤ 5 MB per block at 10 KB tx-size, we
493-
// stay an order of magnitude under the DA cap so headers/data signing
494-
// + envelope cache + retry buffers all fit.
505+
// maxBlockTxs caps GetTxs's per-call return; pairs with the channel
506+
// size so a reaper poll can fully drain a 100 MiB-block-worth of
507+
// queued txs in a single call instead of needing 20× cycles.
495508
func newInMemExecutor() *inMemExecutor {
496509
return &inMemExecutor{
497-
txChan: make(chan []byte, 500),
498-
maxBlockTxs: 500,
510+
txChan: make(chan []byte, 10000),
511+
maxBlockTxs: 10000,
499512
}
500513
}
501514

@@ -547,10 +560,24 @@ func (e *inMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.Execu
547560
return coreexecution.ExecutionInfo{MaxGas: 0}, nil
548561
}
549562

550-
func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, _, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) {
563+
// FilterTxs admits txs in arrival order until the maxBytes budget is
564+
// reached, then postpones the rest back to the sequencer queue so they
565+
// land in a future batch. Skipping this enforcement (a previous version
566+
// returned FilterOK unconditionally) lets a single block sweep up the
567+
// entire mempool — under sustained txsim load that produced 369 MiB
568+
// blocks that exceeded Fibre's per-upload cap and crashed the node
569+
// with `single item exceeds DA blob size limit`.
570+
func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, maxBytes, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) {
551571
st := make([]coreexecution.FilterStatus, len(txs))
552-
for i := range st {
572+
var used uint64
573+
for i, tx := range txs {
574+
size := uint64(len(tx))
575+
if maxBytes > 0 && used+size > maxBytes {
576+
st[i] = coreexecution.FilterPostpone
577+
continue
578+
}
553579
st[i] = coreexecution.FilterOK
580+
used += size
554581
}
555582
return st, nil
556583
}

0 commit comments

Comments
 (0)