Skip to content

Commit fdc79ad

Browse files
authored
perf(store): save metadata async (#3298)
* perf(store): save metadata async * cl * Optimize metadata writes with batching * feedback * De-duplicate batched writes by key in cached store * fix * updates
1 parent 0900dc5 commit fdc79ad

8 files changed

Lines changed: 368 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
### Changes
1313

1414
- Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286)
15+
- Optimize metadata writes by making it async in cache store [#3298](https://github.com/evstack/ev-node/pull/3298)
1516
- Reduce tx cache retention to avoid OOM under (really) heavy tx load [#3299](https://github.com/evstack/ev-node/pull/3299)
1617

1718
## v1.1.1

pkg/store/cached_store.go

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package store
22

33
import (
44
"context"
5+
"sync"
6+
"time"
57

68
lru "github.com/hashicorp/golang-lru/v2"
9+
"github.com/rs/zerolog"
710

811
"github.com/evstack/ev-node/types"
912
)
@@ -14,15 +17,37 @@ const (
1417

1518
// DefaultBlockDataCacheSize is the default number of block data entries to cache.
1619
DefaultBlockDataCacheSize = 200_000
20+
21+
asyncWriteBufferSize = 8192
22+
23+
// batchWindow is the time the write goroutine waits after receiving the first
24+
// op before flushing. This allows bursts of metadata writes (e.g. 3-4 per
25+
// height in the submitter) to be coalesced into a single Badger WriteBatch.
26+
batchWindow = 100 * time.Microsecond
1727
)
1828

29+
type asyncWriteOp struct {
30+
key string
31+
value []byte
32+
isDelete bool
33+
}
34+
1935
// CachedStore wraps a Store with LRU caching for frequently accessed data.
2036
// The underlying LRU cache is thread-safe, so no additional synchronization is needed.
37+
// Metadata writes (SetMetadata, DeleteMetadata) are processed asynchronously via a
38+
// buffered channel to avoid blocking Badger's write pipeline for critical operations
39+
// like block production (batch commits).
2140
type CachedStore struct {
2241
Store
2342

2443
headerCache *lru.Cache[uint64, *types.SignedHeader]
2544
blockDataCache *lru.Cache[uint64, *blockDataEntry]
45+
46+
writeCh chan asyncWriteOp
47+
done chan struct{}
48+
stopMu sync.RWMutex
49+
stopped bool
50+
logger zerolog.Logger
2651
}
2752

2853
type blockDataEntry struct {
@@ -73,6 +98,9 @@ func NewCachedStore(store Store, opts ...CachedStoreOption) (*CachedStore, error
7398
Store: store,
7499
headerCache: headerCache,
75100
blockDataCache: blockDataCache,
101+
writeCh: make(chan asyncWriteOp, asyncWriteBufferSize),
102+
done: make(chan struct{}),
103+
logger: zerolog.Nop(),
76104
}
77105

78106
for _, opt := range opts {
@@ -81,9 +109,58 @@ func NewCachedStore(store Store, opts ...CachedStoreOption) (*CachedStore, error
81109
}
82110
}
83111

112+
cs.startWriteLoop()
113+
84114
return cs, nil
85115
}
86116

117+
func (cs *CachedStore) startWriteLoop() {
118+
go func() {
119+
defer close(cs.done)
120+
for op := range cs.writeCh {
121+
ops := []asyncWriteOp{op}
122+
123+
timer := time.NewTimer(batchWindow)
124+
collect:
125+
for {
126+
select {
127+
case op, ok := <-cs.writeCh:
128+
if !ok {
129+
timer.Stop()
130+
break collect
131+
}
132+
ops = append(ops, op)
133+
case <-timer.C:
134+
break collect
135+
}
136+
}
137+
138+
last := make(map[string]asyncWriteOp, len(ops))
139+
for _, o := range ops {
140+
last[o.key] = o
141+
}
142+
143+
var puts []MetadataKV
144+
var deletes []string
145+
for _, o := range last {
146+
if o.isDelete {
147+
deletes = append(deletes, o.key)
148+
} else {
149+
puts = append(puts, MetadataKV{Key: o.key, Value: o.value})
150+
}
151+
}
152+
153+
if err := cs.BatchMetadata(context.Background(), puts, deletes); err != nil {
154+
for _, o := range ops {
155+
cs.logger.Error().Err(err).Str("key", o.key).
156+
Bool("delete", o.isDelete).
157+
Msg("async metadata batch write failed")
158+
}
159+
}
160+
}
161+
}()
162+
}
163+
87164
// GetHeader returns the header at the given height, using the cache if available.
88165
func (cs *CachedStore) GetHeader(ctx context.Context, height uint64) (*types.SignedHeader, error) {
89166
// Try cache first
@@ -162,7 +239,7 @@ func (cs *CachedStore) Rollback(ctx context.Context, height uint64, aggregator b
162239
}
163240

164241
// PruneBlocks wraps the underlying store's PruneBlocks and invalidates caches
165-
// up to the heigh that we purne
242+
// up to the height that we prune
166243
func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error {
167244
if err := cs.Store.PruneBlocks(ctx, height); err != nil {
168245
return err
@@ -173,8 +250,42 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error {
173250
return nil
174251
}
175252

176-
// Close closes the underlying store.
253+
// SetMetadata queues an asynchronous metadata write. The write is persisted
254+
// by the background goroutine via BatchMetadata. If the store has been stopped,
255+
// the write falls back to synchronous execution on the underlying store.
256+
func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte) error {
257+
cs.stopMu.RLock()
258+
defer cs.stopMu.RUnlock()
259+
260+
if cs.stopped {
261+
return cs.Store.SetMetadata(ctx, key, value)
262+
}
263+
valueCopy := append([]byte(nil), value...)
264+
cs.writeCh <- asyncWriteOp{key: key, value: valueCopy}
265+
return nil
266+
}
267+
268+
// DeleteMetadata queues an asynchronous metadata delete. If the store has been
269+
// stopped, the delete falls back to synchronous execution.
270+
func (cs *CachedStore) DeleteMetadata(ctx context.Context, key string) error {
271+
cs.stopMu.RLock()
272+
defer cs.stopMu.RUnlock()
273+
274+
if cs.stopped {
275+
return cs.Store.DeleteMetadata(ctx, key)
276+
}
277+
cs.writeCh <- asyncWriteOp{key: key, isDelete: true}
278+
return nil
279+
}
280+
281+
// Close drains pending async writes, then closes the underlying store.
177282
func (cs *CachedStore) Close() error {
283+
cs.stopMu.Lock()
284+
cs.stopped = true
285+
close(cs.writeCh)
286+
cs.stopMu.Unlock()
287+
<-cs.done
288+
178289
cs.ClearCache()
179290
return cs.Store.Close()
180291
}

pkg/store/cached_store_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@ package store
22

33
import (
44
"context"
5+
"fmt"
56
"testing"
7+
"time"
68

9+
ds "github.com/ipfs/go-datastore"
10+
"github.com/rs/zerolog"
711
"github.com/stretchr/testify/assert"
812
"github.com/stretchr/testify/require"
913

@@ -270,3 +274,129 @@ func TestCachedStore_Close(t *testing.T) {
270274
err = cachedStore.Close()
271275
require.NoError(t, err)
272276
}
277+
278+
func TestCachedStore_AsyncSetMetadata(t *testing.T) {
279+
t.Parallel()
280+
ctx := context.Background()
281+
282+
kv, err := NewTestInMemoryKVStore()
283+
require.NoError(t, err)
284+
285+
base := New(kv)
286+
cs, err := NewCachedStore(base)
287+
require.NoError(t, err)
288+
t.Cleanup(func() { cs.Close() })
289+
290+
require.NoError(t, cs.SetMetadata(ctx, "key1", []byte("value1")))
291+
292+
require.Eventually(t, func() bool {
293+
v, err := base.GetMetadata(ctx, "key1")
294+
return err == nil && string(v) == "value1"
295+
}, time.Second, 10*time.Millisecond)
296+
}
297+
298+
func TestCachedStore_AsyncDeleteMetadata(t *testing.T) {
299+
t.Parallel()
300+
ctx := context.Background()
301+
302+
kv, err := NewTestInMemoryKVStore()
303+
require.NoError(t, err)
304+
305+
base := New(kv)
306+
require.NoError(t, base.SetMetadata(ctx, "key1", []byte("value1")))
307+
308+
cs, err := NewCachedStore(base)
309+
require.NoError(t, err)
310+
t.Cleanup(func() { cs.Close() })
311+
312+
require.NoError(t, cs.DeleteMetadata(ctx, "key1"))
313+
314+
require.Eventually(t, func() bool {
315+
_, err := base.GetMetadata(ctx, "key1")
316+
return err != nil
317+
}, time.Second, 10*time.Millisecond)
318+
}
319+
320+
func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) {
321+
ctx := context.Background()
322+
323+
dir := t.TempDir()
324+
kv, err := NewDefaultKVStore(dir, "", "test-db")
325+
require.NoError(t, err)
326+
327+
base := New(kv)
328+
cs, err := NewCachedStore(base)
329+
require.NoError(t, err)
330+
331+
const n = 100
332+
for i := range n {
333+
k := fmt.Sprintf("key-%d", i)
334+
require.NoError(t, cs.SetMetadata(ctx, k, []byte(k)))
335+
}
336+
337+
require.NoError(t, cs.Close())
338+
339+
kv2, err := NewDefaultKVStore(dir, "", "test-db")
340+
require.NoError(t, err)
341+
t.Cleanup(func() { kv2.Close() })
342+
reopened := New(kv2)
343+
344+
for i := range n {
345+
k := fmt.Sprintf("key-%d", i)
346+
v, err := reopened.GetMetadata(ctx, k)
347+
require.NoError(t, err)
348+
require.Equal(t, []byte(k), v)
349+
}
350+
}
351+
352+
func TestCachedStore_WriteAfterClose_FallsBack(t *testing.T) {
353+
kv, err := NewTestInMemoryKVStore()
354+
require.NoError(t, err)
355+
356+
base := New(kv)
357+
cs, err := NewCachedStore(base)
358+
require.NoError(t, err)
359+
360+
ctx := context.Background()
361+
require.NoError(t, cs.SetMetadata(ctx, "before", []byte("ok")))
362+
363+
require.NoError(t, cs.Close())
364+
365+
err = cs.SetMetadata(ctx, "after", []byte("sync"))
366+
require.Error(t, err)
367+
}
368+
369+
func TestCachedStore_CoalescesSameKeyOps(t *testing.T) {
370+
ctx := context.Background()
371+
372+
kv, err := NewTestInMemoryKVStore()
373+
require.NoError(t, err)
374+
375+
require.NoError(t, kv.Put(ctx, ds.NewKey(GetMetaKey("k")), []byte("original")))
376+
377+
base := New(kv)
378+
379+
writeCh := make(chan asyncWriteOp, asyncWriteBufferSize)
380+
done := make(chan struct{})
381+
cs := &CachedStore{
382+
Store: base,
383+
writeCh: writeCh,
384+
done: done,
385+
logger: zerolog.Nop(),
386+
}
387+
cs.startWriteLoop()
388+
389+
require.NoError(t, cs.SetMetadata(ctx, "k", []byte("v1")))
390+
require.NoError(t, cs.DeleteMetadata(ctx, "k"))
391+
require.NoError(t, cs.SetMetadata(ctx, "k", []byte("v2")))
392+
393+
cs.stopMu.Lock()
394+
cs.stopped = true
395+
close(writeCh)
396+
cs.stopMu.Unlock()
397+
<-done
398+
399+
v, err := base.GetMetadata(ctx, "k")
400+
require.NoError(t, err)
401+
require.Equal(t, []byte("v2"), v, "last write (Set) should win over delete")
402+
}

pkg/store/store.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,30 @@ func (s *DefaultStore) SetMetadata(ctx context.Context, key string, value []byte
190190
return nil
191191
}
192192

193+
func (s *DefaultStore) BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error {
194+
if len(puts) == 0 && len(deletes) == 0 {
195+
return nil
196+
}
197+
batch, err := s.db.Batch(ctx)
198+
if err != nil {
199+
return fmt.Errorf("failed to create metadata batch: %w", err)
200+
}
201+
for _, kv := range puts {
202+
if err := batch.Put(ctx, ds.NewKey(GetMetaKey(kv.Key)), kv.Value); err != nil {
203+
return fmt.Errorf("failed to batch-put metadata key '%s': %w", kv.Key, err)
204+
}
205+
}
206+
for _, key := range deletes {
207+
if err := batch.Delete(ctx, ds.NewKey(GetMetaKey(key))); err != nil {
208+
return fmt.Errorf("failed to batch-delete metadata key '%s': %w", key, err)
209+
}
210+
}
211+
if err := batch.Commit(ctx); err != nil {
212+
return fmt.Errorf("failed to commit metadata batch: %w", err)
213+
}
214+
return nil
215+
}
216+
193217
// GetMetadata returns values stored for given key with SetMetadata.
194218
func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, error) {
195219
data, err := s.db.Get(ctx, ds.NewKey(GetMetaKey(key)))

pkg/store/tracing.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,25 @@ func (t *tracedStore) DeleteMetadata(ctx context.Context, key string) error {
211211
return nil
212212
}
213213

214+
func (t *tracedStore) BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error {
215+
ctx, span := t.tracer.Start(ctx, "Store.BatchMetadata",
216+
trace.WithAttributes(
217+
attribute.Int("puts", len(puts)),
218+
attribute.Int("deletes", len(deletes)),
219+
),
220+
)
221+
defer span.End()
222+
223+
err := t.inner.BatchMetadata(ctx, puts, deletes)
224+
if err != nil {
225+
span.RecordError(err)
226+
span.SetStatus(codes.Error, err.Error())
227+
return err
228+
}
229+
230+
return nil
231+
}
232+
214233
func (t *tracedStore) DeleteStateAtHeight(ctx context.Context, height uint64) error {
215234
ctx, span := t.tracer.Start(ctx, "Store.DeleteStateAtHeight",
216235
trace.WithAttributes(attribute.Int64("height", int64(height))),

0 commit comments

Comments
 (0)