Skip to content

Commit f7ea4ad

Browse files
committed
perf(store): save metadata async
1 parent e83cf6c commit f7ea4ad

2 files changed

Lines changed: 171 additions & 1 deletion

File tree

pkg/store/cached_store.go

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package store
22

33
import (
44
"context"
5+
"sync"
56

67
lru "github.com/hashicorp/golang-lru/v2"
8+
"github.com/rs/zerolog"
79

810
"github.com/evstack/ev-node/types"
911
)
@@ -14,15 +16,32 @@ const (
1416

1517
// DefaultBlockDataCacheSize is the default number of block data entries to cache.
1618
DefaultBlockDataCacheSize = 200_000
19+
20+
asyncWriteBufferSize = 8192
1721
)
1822

23+
type asyncWriteOp struct {
24+
key string
25+
value []byte
26+
isDelete bool
27+
}
28+
1929
// CachedStore wraps a Store with LRU caching for frequently accessed data.
2030
// The underlying LRU cache is thread-safe, so no additional synchronization is needed.
31+
// Metadata writes (SetMetadata, DeleteMetadata) are processed asynchronously via a
32+
// buffered channel to avoid blocking Badger's write pipeline for critical operations
33+
// like block production (batch commits).
2134
type CachedStore struct {
2235
Store
2336

2437
headerCache *lru.Cache[uint64, *types.SignedHeader]
2538
blockDataCache *lru.Cache[uint64, *blockDataEntry]
39+
40+
writeCh chan asyncWriteOp
41+
done chan struct{}
42+
stopMu sync.RWMutex
43+
stopped bool
44+
logger zerolog.Logger
2645
}
2746

2847
type blockDataEntry struct {
@@ -73,6 +92,9 @@ func NewCachedStore(store Store, opts ...CachedStoreOption) (*CachedStore, error
7392
Store: store,
7493
headerCache: headerCache,
7594
blockDataCache: blockDataCache,
95+
writeCh: make(chan asyncWriteOp, asyncWriteBufferSize),
96+
done: make(chan struct{}),
97+
logger: zerolog.Nop(),
7698
}
7799

78100
for _, opt := range opts {
@@ -81,9 +103,30 @@ func NewCachedStore(store Store, opts ...CachedStoreOption) (*CachedStore, error
81103
}
82104
}
83105

106+
cs.startWriteLoop()
107+
84108
return cs, nil
85109
}
86110

111+
func (cs *CachedStore) startWriteLoop() {
112+
go func() {
113+
defer close(cs.done)
114+
for op := range cs.writeCh {
115+
var err error
116+
if op.isDelete {
117+
err = cs.Store.DeleteMetadata(context.Background(), op.key)
118+
} else {
119+
err = cs.Store.SetMetadata(context.Background(), op.key, op.value)
120+
}
121+
if err != nil {
122+
cs.logger.Error().Err(err).Str("key", op.key).
123+
Bool("delete", op.isDelete).
124+
Msg("async metadata write failed")
125+
}
126+
}
127+
}()
128+
}
129+
87130
// GetHeader returns the header at the given height, using the cache if available.
88131
func (cs *CachedStore) GetHeader(ctx context.Context, height uint64) (*types.SignedHeader, error) {
89132
// Try cache first
@@ -173,8 +216,51 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error {
173216
return nil
174217
}
175218

176-
// Close closes the underlying store.
219+
// SetMetadata queues an asynchronous metadata write. The write is persisted
220+
// by the background goroutine. If the buffer is full or the store has been
221+
// stopped, the write falls back to synchronous execution on the underlying store.
222+
func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte) error {
223+
cs.stopMu.RLock()
224+
if cs.stopped {
225+
cs.stopMu.RUnlock()
226+
return cs.Store.SetMetadata(ctx, key, value)
227+
}
228+
select {
229+
case cs.writeCh <- asyncWriteOp{key: key, value: value}:
230+
cs.stopMu.RUnlock()
231+
return nil
232+
default:
233+
cs.stopMu.RUnlock()
234+
return cs.Store.SetMetadata(ctx, key, value)
235+
}
236+
}
237+
238+
// DeleteMetadata queues an asynchronous metadata delete. If the buffer is full
239+
// or the store has been stopped, the delete falls back to synchronous execution.
240+
func (cs *CachedStore) DeleteMetadata(ctx context.Context, key string) error {
241+
cs.stopMu.RLock()
242+
if cs.stopped {
243+
cs.stopMu.RUnlock()
244+
return cs.Store.DeleteMetadata(ctx, key)
245+
}
246+
select {
247+
case cs.writeCh <- asyncWriteOp{key: key, isDelete: true}:
248+
cs.stopMu.RUnlock()
249+
return nil
250+
default:
251+
cs.stopMu.RUnlock()
252+
return cs.Store.DeleteMetadata(ctx, key)
253+
}
254+
}
255+
256+
// Close drains pending async writes, then closes the underlying store.
177257
func (cs *CachedStore) Close() error {
258+
cs.stopMu.Lock()
259+
cs.stopped = true
260+
close(cs.writeCh)
261+
cs.stopMu.Unlock()
262+
<-cs.done
263+
178264
cs.ClearCache()
179265
return cs.Store.Close()
180266
}

pkg/store/cached_store_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package store
33
import (
44
"context"
55
"testing"
6+
"time"
67

78
"github.com/stretchr/testify/assert"
89
"github.com/stretchr/testify/require"
@@ -270,3 +271,86 @@ func TestCachedStore_Close(t *testing.T) {
270271
err = cachedStore.Close()
271272
require.NoError(t, err)
272273
}
274+
275+
func TestCachedStore_AsyncSetMetadata(t *testing.T) {
276+
t.Parallel()
277+
ctx := context.Background()
278+
279+
kv, err := NewTestInMemoryKVStore()
280+
require.NoError(t, err)
281+
282+
base := New(kv)
283+
cs, err := NewCachedStore(base)
284+
require.NoError(t, err)
285+
t.Cleanup(func() { cs.Close() })
286+
287+
require.NoError(t, cs.SetMetadata(ctx, "key1", []byte("value1")))
288+
289+
require.Eventually(t, func() bool {
290+
v, err := base.GetMetadata(ctx, "key1")
291+
return err == nil && string(v) == "value1"
292+
}, time.Second, 10*time.Millisecond)
293+
}
294+
295+
func TestCachedStore_AsyncDeleteMetadata(t *testing.T) {
296+
t.Parallel()
297+
ctx := context.Background()
298+
299+
kv, err := NewTestInMemoryKVStore()
300+
require.NoError(t, err)
301+
302+
base := New(kv)
303+
require.NoError(t, base.SetMetadata(ctx, "key1", []byte("value1")))
304+
305+
cs, err := NewCachedStore(base)
306+
require.NoError(t, err)
307+
t.Cleanup(func() { cs.Close() })
308+
309+
require.NoError(t, cs.DeleteMetadata(ctx, "key1"))
310+
311+
require.Eventually(t, func() bool {
312+
_, err := base.GetMetadata(ctx, "key1")
313+
return err != nil
314+
}, time.Second, 10*time.Millisecond)
315+
}
316+
317+
func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) {
318+
kv, err := NewTestInMemoryKVStore()
319+
require.NoError(t, err)
320+
321+
base := New(kv)
322+
cs, err := NewCachedStore(base)
323+
require.NoError(t, err)
324+
325+
ctx := context.Background()
326+
const n = 100
327+
for i := 0; i < n; i++ {
328+
k := []byte{byte(i)}
329+
require.NoError(t, cs.SetMetadata(ctx, string(k), k))
330+
}
331+
332+
// Wait for the last key to land via pass-through read
333+
require.Eventually(t, func() bool {
334+
v, err := cs.GetMetadata(ctx, string([]byte{byte(n - 1)}))
335+
return err == nil && len(v) == 1 && v[0] == byte(n-1)
336+
}, 2*time.Second, 10*time.Millisecond)
337+
338+
require.NoError(t, cs.Close())
339+
}
340+
341+
func TestCachedStore_WriteAfterClose_FallsBack(t *testing.T) {
342+
kv, err := NewTestInMemoryKVStore()
343+
require.NoError(t, err)
344+
345+
base := New(kv)
346+
cs, err := NewCachedStore(base)
347+
require.NoError(t, err)
348+
349+
ctx := context.Background()
350+
require.NoError(t, cs.SetMetadata(ctx, "before", []byte("ok")))
351+
352+
require.NoError(t, cs.Close())
353+
354+
err = cs.SetMetadata(ctx, "after", []byte("sync"))
355+
require.Error(t, err)
356+
}

0 commit comments

Comments
 (0)