Skip to content

Commit 47c84a0

Browse files
committed
refactor(syncing): simplify double-sign detection by removing evidence persistence and checking against applied headers
1 parent ab15875 commit 47c84a0

19 files changed

Lines changed: 476 additions & 2116 deletions

block/internal/cache/manager.go

Lines changed: 21 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,6 @@ type CacheManager interface {
4444
SetHeaderDAIncluded(hash string, daHeight uint64, blockHeight uint64)
4545
RemoveHeaderDAIncluded(hash string)
4646

47-
// Pending signed header operations (in-flight, pre-persistence)
48-
SetPendingSignedHeader(h *types.SignedHeader, source string)
49-
GetPendingSignedHeader(blockHeight uint64) (*types.SignedHeader, string, bool)
50-
RemovePendingSignedHeader(blockHeight uint64)
51-
5247
// Data operations
5348
IsDataSeen(hash string) bool
5449
SetDataSeen(hash string, blockHeight uint64)
@@ -98,24 +93,17 @@ type Manager interface {
9893
var _ Manager = (*implementation)(nil)
9994

10095
type implementation struct {
101-
headerCache *Cache
102-
dataCache *Cache
103-
txCache *Cache
104-
txTimestamps *sync.Map // map[string]time.Time
105-
pendingEvents map[uint64]*common.DAHeightEvent
106-
pendingMu sync.Mutex
107-
pendingHeaders *PendingHeaders
108-
pendingData *PendingData
109-
pendingSignedHeaders map[uint64]pendingSignedHeader
110-
pendingSignedHeadersMu sync.RWMutex
111-
store store.Store
112-
config config.Config
113-
logger zerolog.Logger
114-
}
115-
116-
type pendingSignedHeader struct {
117-
header *types.SignedHeader
118-
source string
96+
headerCache *Cache
97+
dataCache *Cache
98+
txCache *Cache
99+
txTimestamps *sync.Map // map[string]time.Time
100+
pendingEvents map[uint64]*common.DAHeightEvent
101+
pendingMu sync.Mutex
102+
pendingHeaders *PendingHeaders
103+
pendingData *PendingData
104+
store store.Store
105+
config config.Config
106+
logger zerolog.Logger
119107
}
120108

121109
// NewManager creates a new Manager, restoring or clearing persisted state as configured.
@@ -135,17 +123,16 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag
135123
}
136124

137125
impl := &implementation{
138-
headerCache: headerCache,
139-
dataCache: dataCache,
140-
txCache: txCache,
141-
txTimestamps: new(sync.Map),
142-
pendingEvents: make(map[uint64]*common.DAHeightEvent),
143-
pendingHeaders: pendingHeaders,
144-
pendingData: pendingData,
145-
pendingSignedHeaders: make(map[uint64]pendingSignedHeader),
146-
store: st,
147-
config: cfg,
148-
logger: logger,
126+
headerCache: headerCache,
127+
dataCache: dataCache,
128+
txCache: txCache,
129+
txTimestamps: new(sync.Map),
130+
pendingEvents: make(map[uint64]*common.DAHeightEvent),
131+
pendingHeaders: pendingHeaders,
132+
pendingData: pendingData,
133+
store: st,
134+
config: cfg,
135+
logger: logger,
149136
}
150137

151138
if cfg.ClearCache {
@@ -192,42 +179,6 @@ func (m *implementation) RemoveHeaderDAIncluded(hash string) {
192179
m.headerCache.removeDAIncluded(hash)
193180
}
194181

195-
// SetPendingSignedHeader records the first SignedHeader seen at this height.
196-
// First-write-wins: later writes at the same height are ignored so the
197-
// double-sign detector can match alternates against the original observation.
198-
func (m *implementation) SetPendingSignedHeader(h *types.SignedHeader, source string) {
199-
if h == nil {
200-
return
201-
}
202-
height := h.Height()
203-
m.pendingSignedHeadersMu.Lock()
204-
defer m.pendingSignedHeadersMu.Unlock()
205-
if _, exists := m.pendingSignedHeaders[height]; exists {
206-
return
207-
}
208-
m.pendingSignedHeaders[height] = pendingSignedHeader{header: h, source: source}
209-
}
210-
211-
// GetPendingSignedHeader returns the first-seen SignedHeader and the source
212-
// ("da" or "p2p") it was observed from.
213-
func (m *implementation) GetPendingSignedHeader(blockHeight uint64) (*types.SignedHeader, string, bool) {
214-
m.pendingSignedHeadersMu.RLock()
215-
defer m.pendingSignedHeadersMu.RUnlock()
216-
entry, ok := m.pendingSignedHeaders[blockHeight]
217-
if !ok {
218-
return nil, "", false
219-
}
220-
return entry.header, entry.source, true
221-
}
222-
223-
// RemovePendingSignedHeader evicts the entry once the height is persisted, so
224-
// the store becomes the authoritative source for double-sign comparison.
225-
func (m *implementation) RemovePendingSignedHeader(blockHeight uint64) {
226-
m.pendingSignedHeadersMu.Lock()
227-
delete(m.pendingSignedHeaders, blockHeight)
228-
m.pendingSignedHeadersMu.Unlock()
229-
}
230-
231182
// DaHeight returns the highest DA height seen across header and data caches.
232183
func (m *implementation) DaHeight() uint64 {
233184
return max(m.headerCache.daHeight(), m.dataCache.daHeight())
@@ -318,7 +269,6 @@ func (m *implementation) DeleteHeight(blockHeight uint64) {
318269
m.pendingMu.Lock()
319270
delete(m.pendingEvents, blockHeight)
320271
m.pendingMu.Unlock()
321-
m.RemovePendingSignedHeader(blockHeight)
322272

323273
// Note: txCache is intentionally NOT deleted here because:
324274
// 1. Transactions are tracked by hash, not by block height (they use height 0)
@@ -464,9 +414,6 @@ func (m *implementation) ClearFromStore() error {
464414
m.dataCache = NewCache(m.store, DataDAIncludedPrefix)
465415
m.txCache = NewCache(nil, "")
466416
m.pendingEvents = make(map[uint64]*common.DAHeightEvent)
467-
m.pendingSignedHeadersMu.Lock()
468-
m.pendingSignedHeaders = make(map[uint64]pendingSignedHeader)
469-
m.pendingSignedHeadersMu.Unlock()
470417

471418
// Initialize DA height from store metadata to ensure DaHeight() is never 0.
472419
m.initDAHeightFromStore(ctx)

block/internal/cache/manager_test.go

Lines changed: 0 additions & 201 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package cache
33
import (
44
"context"
55
"encoding/binary"
6-
"sync"
7-
"sync/atomic"
86
"testing"
97
"time"
108

@@ -520,205 +518,6 @@ func TestManager_DaHeightAfterCacheClear(t *testing.T) {
520518
"DaHeight should be seeded from finalized-tip metadata even after ClearCache")
521519
}
522520

523-
// builds a minimal SignedHeader; variant differentiates hashes at the same height.
524-
func signedHeaderForHeight(height uint64, variant byte) *types.SignedHeader {
525-
return &types.SignedHeader{
526-
Header: types.Header{
527-
BaseHeader: types.BaseHeader{ChainID: "pending-signed", Height: height, Time: 1},
528-
AppHash: []byte{variant, variant, variant},
529-
},
530-
}
531-
}
532-
533-
func TestManager_PendingSignedHeader_FirstWriteWins(t *testing.T) {
534-
t.Parallel()
535-
cfg := tempConfig(t)
536-
st := testMemStore(t)
537-
538-
m, err := NewManager(cfg, st, zerolog.Nop())
539-
require.NoError(t, err)
540-
541-
first := signedHeaderForHeight(5, 0x01)
542-
second := signedHeaderForHeight(5, 0x02)
543-
require.NotEqual(t, first.Hash().String(), second.Hash().String())
544-
545-
m.SetPendingSignedHeader(first, "p2p")
546-
m.SetPendingSignedHeader(second, "da")
547-
548-
got, source, ok := m.GetPendingSignedHeader(5)
549-
require.True(t, ok)
550-
require.Equal(t, first.Hash().String(), got.Hash().String())
551-
require.Equal(t, "p2p", source)
552-
}
553-
554-
func TestManager_PendingSignedHeader_NilHeaderIgnored(t *testing.T) {
555-
t.Parallel()
556-
cfg := tempConfig(t)
557-
st := testMemStore(t)
558-
559-
m, err := NewManager(cfg, st, zerolog.Nop())
560-
require.NoError(t, err)
561-
562-
m.SetPendingSignedHeader(nil, "p2p")
563-
_, _, ok := m.GetPendingSignedHeader(5)
564-
require.False(t, ok)
565-
566-
hdr := signedHeaderForHeight(5, 0x01)
567-
m.SetPendingSignedHeader(hdr, "p2p")
568-
got, _, ok := m.GetPendingSignedHeader(5)
569-
require.True(t, ok)
570-
require.Equal(t, hdr.Hash().String(), got.Hash().String())
571-
}
572-
573-
func TestManager_GetPendingSignedHeader_Miss(t *testing.T) {
574-
t.Parallel()
575-
cfg := tempConfig(t)
576-
st := testMemStore(t)
577-
578-
m, err := NewManager(cfg, st, zerolog.Nop())
579-
require.NoError(t, err)
580-
581-
hdr, source, ok := m.GetPendingSignedHeader(99)
582-
require.False(t, ok)
583-
require.Nil(t, hdr)
584-
require.Empty(t, source)
585-
}
586-
587-
func TestManager_RemovePendingSignedHeader_Idempotent(t *testing.T) {
588-
t.Parallel()
589-
cfg := tempConfig(t)
590-
st := testMemStore(t)
591-
592-
m, err := NewManager(cfg, st, zerolog.Nop())
593-
require.NoError(t, err)
594-
595-
require.NotPanics(t, func() { m.RemovePendingSignedHeader(123) })
596-
597-
hdr := signedHeaderForHeight(5, 0x01)
598-
m.SetPendingSignedHeader(hdr, "p2p")
599-
m.RemovePendingSignedHeader(5)
600-
m.RemovePendingSignedHeader(5)
601-
_, _, ok := m.GetPendingSignedHeader(5)
602-
require.False(t, ok)
603-
}
604-
605-
func TestManager_DeleteHeight_EvictsPendingSignedHeader(t *testing.T) {
606-
t.Parallel()
607-
cfg := tempConfig(t)
608-
st := testMemStore(t)
609-
610-
m, err := NewManager(cfg, st, zerolog.Nop())
611-
require.NoError(t, err)
612-
613-
hdr := signedHeaderForHeight(5, 0x01)
614-
m.SetPendingSignedHeader(hdr, "p2p")
615-
_, _, ok := m.GetPendingSignedHeader(5)
616-
require.True(t, ok)
617-
618-
m.DeleteHeight(5)
619-
620-
_, _, ok = m.GetPendingSignedHeader(5)
621-
require.False(t, ok)
622-
}
623-
624-
func TestManager_ClearFromStore_ResetsPendingSignedHeaders(t *testing.T) {
625-
t.Parallel()
626-
cfg := tempConfig(t)
627-
st := testMemStore(t)
628-
629-
m, err := NewManager(cfg, st, zerolog.Nop())
630-
require.NoError(t, err)
631-
632-
m.SetPendingSignedHeader(signedHeaderForHeight(5, 0x01), "p2p")
633-
m.SetPendingSignedHeader(signedHeaderForHeight(6, 0x02), "da")
634-
635-
impl, ok := m.(*implementation)
636-
require.True(t, ok)
637-
require.NoError(t, impl.ClearFromStore())
638-
639-
for _, h := range []uint64{5, 6} {
640-
_, _, present := m.GetPendingSignedHeader(h)
641-
require.False(t, present, "pending entry at %d must be cleared", h)
642-
}
643-
}
644-
645-
// Race-detector coverage for the pending-signed-header map. Run with -race.
646-
func TestManager_PendingSignedHeader_Concurrency(t *testing.T) {
647-
t.Parallel()
648-
cfg := tempConfig(t)
649-
st := testMemStore(t)
650-
651-
m, err := NewManager(cfg, st, zerolog.Nop())
652-
require.NoError(t, err)
653-
654-
const (
655-
writers = 8
656-
readers = 8
657-
removers = 4
658-
heightsPerRun = 200
659-
)
660-
661-
headers := make([]*types.SignedHeader, heightsPerRun)
662-
for i := range headers {
663-
headers[i] = signedHeaderForHeight(uint64(i+1), byte(i&0xff))
664-
}
665-
666-
var (
667-
wg sync.WaitGroup
668-
startCh = make(chan struct{})
669-
writerHit atomic.Int64
670-
readerHit atomic.Int64
671-
)
672-
673-
for w := range writers {
674-
wg.Add(1)
675-
go func(seed int) {
676-
defer wg.Done()
677-
<-startCh
678-
for i := range heightsPerRun {
679-
idx := (seed*7 + i) % heightsPerRun
680-
m.SetPendingSignedHeader(headers[idx], "p2p")
681-
writerHit.Add(1)
682-
}
683-
}(w)
684-
}
685-
686-
for r := range readers {
687-
wg.Add(1)
688-
go func(seed int) {
689-
defer wg.Done()
690-
<-startCh
691-
for i := range heightsPerRun {
692-
h := uint64((seed*11+i)%heightsPerRun + 1)
693-
_, _, _ = m.GetPendingSignedHeader(h)
694-
readerHit.Add(1)
695-
}
696-
}(r)
697-
}
698-
699-
for d := range removers {
700-
wg.Add(1)
701-
go func(seed int) {
702-
defer wg.Done()
703-
<-startCh
704-
for i := range heightsPerRun {
705-
h := uint64((seed*13+i)%heightsPerRun + 1)
706-
if i%2 == 0 {
707-
m.RemovePendingSignedHeader(h)
708-
} else {
709-
m.DeleteHeight(h)
710-
}
711-
}
712-
}(d)
713-
}
714-
715-
close(startCh)
716-
wg.Wait()
717-
718-
require.Equal(t, int64(writers*heightsPerRun), writerHit.Load())
719-
require.Equal(t, int64(readers*heightsPerRun), readerHit.Load())
720-
}
721-
722521
func TestManager_DaHeightFromStoreOnRestore(t *testing.T) {
723522
t.Parallel()
724523

0 commit comments

Comments
 (0)