Skip to content

Commit 85119a6

Browse files
authored
fix: drain pending tx queue in batches with durable ack (#3351)
* fix: drain pending tx queue in batches with durable ack Resolves slow pending-tx drain caused by one-queue-entry-per-block consumption and seen-on-enqueue dedup poisoning: - single sequencer drains multiple queue entries per block up to MaxBytes; WAL entries are deleted only on ack after block commit, with retry before the next batch and startup reconciliation against the last committed block (DropIncluded) - tx dedup moves from the 30-min cache-manager tx cache into the batch queue itself, keyed by sha256, released on ack; dead cache tx-seen subsystem removed - postponed txs are requeued durably during ack and stay deduped - Load cleans duplicate/stale WAL entries so restarts cannot resurrect committed txs - reaper submits one scrape per interval and notifies the executor only when new entries were actually queued (explicit pending count wiring, immune to tracing wrappers) * fix: address review findings in batch queue drain/ack path - defer postponed entry requeue until ack fully succeeds so a drain rollback after a failed ack neither loses nor duplicates postponed txs - replace fmt.Printf with structured logging in BatchQueue - use monotonic enqueue counter in reaper to detect new submissions race-free against concurrent drain/ack * chore: fix import grouping in reaping bench test * fix: address PR review feedback on batch queue durability - fail Load on datastore read errors instead of dropping txs - detach rollback WAL cleanup from drain context so it runs on shutdown - raise queue load timeout for large WAL backlogs - merge postponed-tx collection into the filter loop - document DropIncluded aliasing, SetPostponed contract, txSeen bound - add tests: executor ack retry, reconcile crash recovery, bulk-prepend rollback * chore: addressing PR feedback * chore: pr feedback
1 parent 3fed700 commit 85119a6

17 files changed

Lines changed: 1286 additions & 869 deletions

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
## [Unreleased]
1111

12+
### Fixed
13+
14+
- Drain the pending tx queue in merged batches with a durable WAL-backed ack, fixing severe queue backlog under heavy tx load. Tx dedup moved from the reaper cache into the sequencer queue [#3351](https://github.com/evstack/ev-node/pull/3351)
15+
1216
## v1.1.2
1317

1418
### Changes

block/components.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,6 @@ func newAggregatorComponents(
278278
sequencer,
279279
genesis,
280280
logger,
281-
cacheManager,
282281
config.Node.ScrapeInterval.Duration,
283282
executor.NotifyNewTransactions,
284283
)

block/internal/cache/generic_cache.go

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,6 @@ func (c *Cache) isSeen(hash string) bool {
5858
return c.hashes[hash]
5959
}
6060

61-
// areSeen checks which hashes have been seen. Returns a boolean slice
62-
// parallel to the input where result[i] is true if hashes[i] is in the
63-
// cache. Acquires the read lock once for the entire batch.
64-
func (c *Cache) areSeen(hashes []string) []bool {
65-
c.mu.RLock()
66-
defer c.mu.RUnlock()
67-
result := make([]bool, len(hashes))
68-
for i, h := range hashes {
69-
result[i] = c.hashes[h]
70-
}
71-
return result
72-
}
73-
7461
func (c *Cache) setSeen(hash string, height uint64) {
7562
c.mu.Lock()
7663
defer c.mu.Unlock()
@@ -82,37 +69,6 @@ func (c *Cache) setSeen(hash string, height uint64) {
8269
c.hashByHeight[height] = hash
8370
}
8471

85-
func (c *Cache) removeSeen(hash string) {
86-
c.mu.Lock()
87-
defer c.mu.Unlock()
88-
delete(c.hashes, hash)
89-
}
90-
91-
// setSeenBatch marks all hashes as seen under a single write lock.
92-
// For height 0 (transactions), the hashByHeight bookkeeping is skipped
93-
// since all txs share the same sentinel height — the map lookup and
94-
// overwrite on every entry is pure overhead with no benefit.
95-
func (c *Cache) setSeenBatch(hashes []string, height uint64) {
96-
c.mu.Lock()
97-
defer c.mu.Unlock()
98-
if height == 0 {
99-
for _, h := range hashes {
100-
c.hashes[h] = true
101-
}
102-
return
103-
}
104-
105-
// currently not used, but there for completeness against setSeen
106-
for _, h := range hashes {
107-
if existing, ok := c.hashByHeight[height]; ok && existing == h {
108-
c.hashes[existing] = true
109-
continue
110-
}
111-
c.hashes[h] = true
112-
c.hashByHeight[height] = h
113-
}
114-
}
115-
11672
func (c *Cache) getDAIncluded(hash string) (uint64, bool) {
11773
c.mu.RLock()
11874
defer c.mu.RUnlock()

block/internal/cache/generic_cache_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,6 @@ func TestCache_BasicOperations(t *testing.T) {
199199
assert.False(t, c.isSeen("hash1"))
200200
c.setSeen("hash1", 1)
201201
assert.True(t, c.isSeen("hash1"))
202-
c.removeSeen("hash1")
203-
assert.False(t, c.isSeen("hash1"))
204202

205203
_, ok := c.getDAIncluded("hash2")
206204
assert.False(t, ok)

block/internal/cache/manager.go

Lines changed: 1 addition & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/binary"
66
"fmt"
77
"sync"
8-
"time"
98

109
"github.com/rs/zerolog"
1110

@@ -24,10 +23,6 @@ const (
2423

2524
// DataDAIncludedPrefix is the store key prefix for data DA inclusion tracking.
2625
DataDAIncludedPrefix = "cache/data-da-included/"
27-
28-
// DefaultTxCacheRetention is the default time to keep transaction hashes in cache.
29-
// Keeping a too high value can lead to OOM during heavy transaction load.
30-
DefaultTxCacheRetention = 30 * time.Minute
3126
)
3227

3328
// CacheManager provides thread-safe cache operations for tracking seen blocks
@@ -51,13 +46,6 @@ type CacheManager interface {
5146
SetDataDAIncluded(daCommitmentHash string, daHeight uint64, blockHeight uint64)
5247
RemoveDataDAIncluded(hash string)
5348

54-
// Transaction operations
55-
IsTxSeen(hash string) bool
56-
AreTxsSeen(hashes []string) []bool
57-
SetTxSeen(hash string)
58-
SetTxsSeen(hashes []string)
59-
CleanupOldTxs(olderThan time.Duration) int
60-
6149
// Pending events syncing coordination
6250
GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent
6351
SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent)
@@ -94,8 +82,6 @@ var _ Manager = (*implementation)(nil)
9482
type implementation struct {
9583
headerCache *Cache
9684
dataCache *Cache
97-
txCache *Cache
98-
txTimestamps *sync.Map // map[string]time.Time
9985
pendingEvents map[uint64]*common.DAHeightEvent
10086
pendingMu sync.Mutex
10187
pendingHeaders *PendingHeaders
@@ -109,7 +95,6 @@ type implementation struct {
10995
func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manager, error) {
11096
headerCache := NewCache(st, HeaderDAIncludedPrefix)
11197
dataCache := NewCache(st, DataDAIncludedPrefix)
112-
txCache := NewCache(nil, "")
11398

11499
pendingHeaders, err := NewPendingHeaders(st, logger)
115100
if err != nil {
@@ -124,8 +109,6 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag
124109
impl := &implementation{
125110
headerCache: headerCache,
126111
dataCache: dataCache,
127-
txCache: txCache,
128-
txTimestamps: new(sync.Map),
129112
pendingEvents: make(map[uint64]*common.DAHeightEvent),
130113
pendingHeaders: pendingHeaders,
131114
pendingData: pendingData,
@@ -202,59 +185,6 @@ func (m *implementation) RemoveDataDAIncluded(hash string) {
202185
m.dataCache.removeDAIncluded(hash)
203186
}
204187

205-
func (m *implementation) IsTxSeen(hash string) bool {
206-
return m.txCache.isSeen(hash)
207-
}
208-
209-
func (m *implementation) AreTxsSeen(hashes []string) []bool {
210-
return m.txCache.areSeen(hashes)
211-
}
212-
213-
func (m *implementation) SetTxSeen(hash string) {
214-
// Use 0 as height since transactions don't have a block height yet
215-
m.txCache.setSeen(hash, 0)
216-
// Track timestamp for cleanup purposes
217-
m.txTimestamps.Store(hash, time.Now())
218-
}
219-
220-
func (m *implementation) SetTxsSeen(hashes []string) {
221-
m.txCache.setSeenBatch(hashes, 0)
222-
now := time.Now()
223-
for _, hash := range hashes {
224-
m.txTimestamps.Store(hash, now)
225-
}
226-
}
227-
228-
// CleanupOldTxs removes transaction hashes older than olderThan and returns
229-
// the count removed. Defaults to DefaultTxCacheRetention if olderThan <= 0.
230-
func (m *implementation) CleanupOldTxs(olderThan time.Duration) int {
231-
if olderThan <= 0 {
232-
olderThan = DefaultTxCacheRetention
233-
}
234-
235-
cutoff := time.Now().Add(-olderThan)
236-
removed := 0
237-
238-
m.txTimestamps.Range(func(key, value any) bool {
239-
hash, ok := key.(string)
240-
if !ok {
241-
return true
242-
}
243-
timestamp, ok := value.(time.Time)
244-
if !ok {
245-
return true
246-
}
247-
if timestamp.Before(cutoff) {
248-
m.txCache.removeSeen(hash)
249-
m.txTimestamps.Delete(hash)
250-
removed++
251-
}
252-
return true
253-
})
254-
255-
return removed
256-
}
257-
258188
// DeleteHeight removes from all caches the given height.
259189
// This can be done when a height has been da included.
260190
func (m *implementation) DeleteHeight(blockHeight uint64) {
@@ -263,12 +193,6 @@ func (m *implementation) DeleteHeight(blockHeight uint64) {
263193
m.pendingMu.Lock()
264194
delete(m.pendingEvents, blockHeight)
265195
m.pendingMu.Unlock()
266-
267-
// Note: txCache is intentionally NOT deleted here because:
268-
// 1. Transactions are tracked by hash, not by block height (they use height 0)
269-
// 2. A transaction seen at one height may be resubmitted at a different height
270-
// 3. The cache prevents duplicate submissions across block heights
271-
// 4. Cleanup is handled separately via CleanupOldTxs() based on time, not height
272196
}
273197

274198
// Pending operations
@@ -363,7 +287,7 @@ func (m *implementation) SaveToStore() error {
363287
return fmt.Errorf("failed to save data cache to store: %w", err)
364288
}
365289

366-
// TX cache and pending events are ephemeral - not persisted
290+
// pending events are ephemeral - not persisted
367291
return nil
368292
}
369293

@@ -406,7 +330,6 @@ func (m *implementation) ClearFromStore() error {
406330

407331
m.headerCache = NewCache(m.store, HeaderDAIncludedPrefix)
408332
m.dataCache = NewCache(m.store, DataDAIncludedPrefix)
409-
m.txCache = NewCache(nil, "")
410333
m.pendingEvents = make(map[uint64]*common.DAHeightEvent)
411334

412335
// Initialize DA height from store metadata to ensure DaHeight() is never 0.

0 commit comments

Comments
 (0)