Skip to content

Commit bd4e754

Browse files
committed
fix: drain pending tx queue in batches with durable ack
Resolves slow pending-tx drain caused by one-queue-entry-per-block consumption and seen-on-enqueue dedup poisoning: - single sequencer drains multiple queue entries per block up to MaxBytes; WAL entries are deleted only on ack after block commit, with retry before the next batch and startup reconciliation against the last committed block (DropIncluded) - tx dedup moves from the 30-min cache-manager tx cache into the batch queue itself, keyed by sha256, released on ack; dead cache tx-seen subsystem removed - postponed txs are requeued durably during ack and stay deduped - Load cleans duplicate/stale WAL entries so restarts cannot resurrect committed txs - reaper submits one scrape per interval and notifies the executor only when new entries were actually queued (explicit pending count wiring, immune to tracing wrappers)
1 parent 89c7fe8 commit bd4e754

14 files changed

Lines changed: 1090 additions & 842 deletions

File tree

block/components.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ func newAggregatorComponents(
238238
// error channel for critical failures
239239
errorCh := make(chan error, 1)
240240

241+
// capture raw sequencer before tracing wrap for batch ack interface
242+
rawSequencer := sequencer
243+
241244
// wrap sequencer with tracing if enabled
242245
if config.Instrumentation.IsTracingEnabled() {
243246
sequencer = telemetry.WithTracingSequencer(sequencer)
@@ -273,19 +276,42 @@ func newAggregatorComponents(
273276
}
274277
pruner := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)
275278

279+
// expose the raw sequencer's pending batch count so the reaper can
280+
// distinguish duplicate scrapes from newly queued entries, even when
281+
// the sequencer is wrapped with tracing
282+
type pendingBatchCounter interface {
283+
PendingBatchCount() int
284+
}
285+
var pendingBatchCount func() int
286+
if counter, ok := rawSequencer.(pendingBatchCounter); ok {
287+
pendingBatchCount = counter.PendingBatchCount
288+
}
289+
276290
reaper, err := reaping.NewReaper(
277291
exec,
278292
sequencer,
279293
genesis,
280294
logger,
281-
cacheManager,
282295
config.Node.ScrapeInterval.Duration,
283296
executor.NotifyNewTransactions,
297+
pendingBatchCount,
284298
)
285299
if err != nil {
286300
return nil, fmt.Errorf("failed to create reaper: %w", err)
287301
}
288302

303+
// wire batch ack callback so drained queue entries are committed after block commit
304+
type batchAcknowledger interface {
305+
AckBatch(ctx context.Context) error
306+
}
307+
if acker, ok := rawSequencer.(batchAcknowledger); ok {
308+
executor.SetOnBatchCommitted(acker.AckBatch)
309+
} else if !config.Node.BasedSequencer {
310+
// without an ack, drained queue entries are rolled back on every
311+
// retrieval and the same transactions would be re-included each block
312+
logger.Warn().Msg("sequencer does not implement AckBatch; drained batch entries will not be acknowledged after block commit")
313+
}
314+
289315
if config.Node.BasedSequencer { // no submissions needed for based sequencer
290316
return &Components{
291317
Executor: executor,

block/internal/cache/generic_cache.go

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,6 @@ func (c *Cache) isSeen(hash string) bool {
5858
return c.hashes[hash]
5959
}
6060

61-
// areSeen checks which hashes have been seen. Returns a boolean slice
62-
// parallel to the input where result[i] is true if hashes[i] is in the
63-
// cache. Acquires the read lock once for the entire batch.
64-
func (c *Cache) areSeen(hashes []string) []bool {
65-
c.mu.RLock()
66-
defer c.mu.RUnlock()
67-
result := make([]bool, len(hashes))
68-
for i, h := range hashes {
69-
result[i] = c.hashes[h]
70-
}
71-
return result
72-
}
73-
7461
func (c *Cache) setSeen(hash string, height uint64) {
7562
c.mu.Lock()
7663
defer c.mu.Unlock()
@@ -82,37 +69,6 @@ func (c *Cache) setSeen(hash string, height uint64) {
8269
c.hashByHeight[height] = hash
8370
}
8471

85-
func (c *Cache) removeSeen(hash string) {
86-
c.mu.Lock()
87-
defer c.mu.Unlock()
88-
delete(c.hashes, hash)
89-
}
90-
91-
// setSeenBatch marks all hashes as seen under a single write lock.
92-
// For height 0 (transactions), the hashByHeight bookkeeping is skipped
93-
// since all txs share the same sentinel height — the map lookup and
94-
// overwrite on every entry is pure overhead with no benefit.
95-
func (c *Cache) setSeenBatch(hashes []string, height uint64) {
96-
c.mu.Lock()
97-
defer c.mu.Unlock()
98-
if height == 0 {
99-
for _, h := range hashes {
100-
c.hashes[h] = true
101-
}
102-
return
103-
}
104-
105-
// currently not used, but there for completeness against setSeen
106-
for _, h := range hashes {
107-
if existing, ok := c.hashByHeight[height]; ok && existing == h {
108-
c.hashes[existing] = true
109-
continue
110-
}
111-
c.hashes[h] = true
112-
c.hashByHeight[height] = h
113-
}
114-
}
115-
11672
func (c *Cache) getDAIncluded(hash string) (uint64, bool) {
11773
c.mu.RLock()
11874
defer c.mu.RUnlock()

block/internal/cache/generic_cache_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,6 @@ func TestCache_BasicOperations(t *testing.T) {
199199
assert.False(t, c.isSeen("hash1"))
200200
c.setSeen("hash1", 1)
201201
assert.True(t, c.isSeen("hash1"))
202-
c.removeSeen("hash1")
203-
assert.False(t, c.isSeen("hash1"))
204202

205203
_, ok := c.getDAIncluded("hash2")
206204
assert.False(t, ok)

block/internal/cache/manager.go

Lines changed: 1 addition & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/binary"
66
"fmt"
77
"sync"
8-
"time"
98

109
"github.com/rs/zerolog"
1110

@@ -24,10 +23,6 @@ const (
2423

2524
// DataDAIncludedPrefix is the store key prefix for data DA inclusion tracking.
2625
DataDAIncludedPrefix = "cache/data-da-included/"
27-
28-
// DefaultTxCacheRetention is the default time to keep transaction hashes in cache.
29-
// Keeping a too high value can lead to OOM during heavy transaction load.
30-
DefaultTxCacheRetention = 30 * time.Minute
3126
)
3227

3328
// CacheManager provides thread-safe cache operations for tracking seen blocks
@@ -51,13 +46,6 @@ type CacheManager interface {
5146
SetDataDAIncluded(daCommitmentHash string, daHeight uint64, blockHeight uint64)
5247
RemoveDataDAIncluded(hash string)
5348

54-
// Transaction operations
55-
IsTxSeen(hash string) bool
56-
AreTxsSeen(hashes []string) []bool
57-
SetTxSeen(hash string)
58-
SetTxsSeen(hashes []string)
59-
CleanupOldTxs(olderThan time.Duration) int
60-
6149
// Pending events syncing coordination
6250
GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent
6351
SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent)
@@ -94,8 +82,6 @@ var _ Manager = (*implementation)(nil)
9482
type implementation struct {
9583
headerCache *Cache
9684
dataCache *Cache
97-
txCache *Cache
98-
txTimestamps *sync.Map // map[string]time.Time
9985
pendingEvents map[uint64]*common.DAHeightEvent
10086
pendingMu sync.Mutex
10187
pendingHeaders *PendingHeaders
@@ -109,7 +95,6 @@ type implementation struct {
10995
func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manager, error) {
11096
headerCache := NewCache(st, HeaderDAIncludedPrefix)
11197
dataCache := NewCache(st, DataDAIncludedPrefix)
112-
txCache := NewCache(nil, "")
11398

11499
pendingHeaders, err := NewPendingHeaders(st, logger)
115100
if err != nil {
@@ -124,8 +109,6 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag
124109
impl := &implementation{
125110
headerCache: headerCache,
126111
dataCache: dataCache,
127-
txCache: txCache,
128-
txTimestamps: new(sync.Map),
129112
pendingEvents: make(map[uint64]*common.DAHeightEvent),
130113
pendingHeaders: pendingHeaders,
131114
pendingData: pendingData,
@@ -202,59 +185,6 @@ func (m *implementation) RemoveDataDAIncluded(hash string) {
202185
m.dataCache.removeDAIncluded(hash)
203186
}
204187

205-
func (m *implementation) IsTxSeen(hash string) bool {
206-
return m.txCache.isSeen(hash)
207-
}
208-
209-
func (m *implementation) AreTxsSeen(hashes []string) []bool {
210-
return m.txCache.areSeen(hashes)
211-
}
212-
213-
func (m *implementation) SetTxSeen(hash string) {
214-
// Use 0 as height since transactions don't have a block height yet
215-
m.txCache.setSeen(hash, 0)
216-
// Track timestamp for cleanup purposes
217-
m.txTimestamps.Store(hash, time.Now())
218-
}
219-
220-
func (m *implementation) SetTxsSeen(hashes []string) {
221-
m.txCache.setSeenBatch(hashes, 0)
222-
now := time.Now()
223-
for _, hash := range hashes {
224-
m.txTimestamps.Store(hash, now)
225-
}
226-
}
227-
228-
// CleanupOldTxs removes transaction hashes older than olderThan and returns
229-
// the count removed. Defaults to DefaultTxCacheRetention if olderThan <= 0.
230-
func (m *implementation) CleanupOldTxs(olderThan time.Duration) int {
231-
if olderThan <= 0 {
232-
olderThan = DefaultTxCacheRetention
233-
}
234-
235-
cutoff := time.Now().Add(-olderThan)
236-
removed := 0
237-
238-
m.txTimestamps.Range(func(key, value any) bool {
239-
hash, ok := key.(string)
240-
if !ok {
241-
return true
242-
}
243-
timestamp, ok := value.(time.Time)
244-
if !ok {
245-
return true
246-
}
247-
if timestamp.Before(cutoff) {
248-
m.txCache.removeSeen(hash)
249-
m.txTimestamps.Delete(hash)
250-
removed++
251-
}
252-
return true
253-
})
254-
255-
return removed
256-
}
257-
258188
// DeleteHeight removes from all caches the given height.
259189
// This can be done when a height has been da included.
260190
func (m *implementation) DeleteHeight(blockHeight uint64) {
@@ -263,12 +193,6 @@ func (m *implementation) DeleteHeight(blockHeight uint64) {
263193
m.pendingMu.Lock()
264194
delete(m.pendingEvents, blockHeight)
265195
m.pendingMu.Unlock()
266-
267-
// Note: txCache is intentionally NOT deleted here because:
268-
// 1. Transactions are tracked by hash, not by block height (they use height 0)
269-
// 2. A transaction seen at one height may be resubmitted at a different height
270-
// 3. The cache prevents duplicate submissions across block heights
271-
// 4. Cleanup is handled separately via CleanupOldTxs() based on time, not height
272196
}
273197

274198
// Pending operations
@@ -363,7 +287,7 @@ func (m *implementation) SaveToStore() error {
363287
return fmt.Errorf("failed to save data cache to store: %w", err)
364288
}
365289

366-
// TX cache and pending events are ephemeral - not persisted
290+
// pending events are ephemeral - not persisted
367291
return nil
368292
}
369293

@@ -406,7 +330,6 @@ func (m *implementation) ClearFromStore() error {
406330

407331
m.headerCache = NewCache(m.store, HeaderDAIncludedPrefix)
408332
m.dataCache = NewCache(m.store, DataDAIncludedPrefix)
409-
m.txCache = NewCache(nil, "")
410333
m.pendingEvents = make(map[uint64]*common.DAHeightEvent)
411334

412335
// Initialize DA height from store metadata to ensure DaHeight() is never 0.

0 commit comments

Comments
 (0)