Skip to content

Commit 6d4b404

Browse files
committed
approach 2
1 parent f8a2cdb commit 6d4b404

29 files changed

Lines changed: 1117 additions & 1739 deletions

block/internal/da/client.go

Lines changed: 163 additions & 139 deletions
Large diffs are not rendered by default.

block/internal/da/client_test.go

Lines changed: 102 additions & 416 deletions
Large diffs are not rendered by default.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package da
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/celestiaorg/go-square/v3/share"
9+
10+
"github.com/evstack/ev-node/pkg/blob"
11+
)
12+
13+
// LocalBlobAPI is a simple in-memory BlobAPI implementation for tests.
14+
type LocalBlobAPI struct {
15+
mu sync.Mutex
16+
height uint64
17+
maxSize uint64
18+
byHeight map[uint64][]*blob.Blob
19+
}
20+
21+
// NewLocalBlobAPI creates an in-memory BlobAPI with a max blob size.
22+
func NewLocalBlobAPI(maxSize uint64) *LocalBlobAPI {
23+
return &LocalBlobAPI{
24+
maxSize: maxSize,
25+
byHeight: make(map[uint64][]*blob.Blob),
26+
}
27+
}
28+
29+
func (l *LocalBlobAPI) Submit(ctx context.Context, blobs []*blob.Blob, _ *blob.SubmitOptions) (uint64, error) {
30+
l.mu.Lock()
31+
defer l.mu.Unlock()
32+
33+
for i, b := range blobs {
34+
if uint64(len(b.Data())) > l.maxSize {
35+
return 0, fmt.Errorf("blob %d too big", i)
36+
}
37+
}
38+
39+
l.height++
40+
// store clones to avoid external mutation
41+
stored := make([]*blob.Blob, len(blobs))
42+
for i, b := range blobs {
43+
stored[i] = b
44+
}
45+
l.byHeight[l.height] = append(l.byHeight[l.height], stored...)
46+
return l.height, nil
47+
}
48+
49+
func (l *LocalBlobAPI) GetAll(ctx context.Context, height uint64, namespaces []share.Namespace) ([]*blob.Blob, error) {
50+
l.mu.Lock()
51+
defer l.mu.Unlock()
52+
53+
nsMap := make(map[string]struct{}, len(namespaces))
54+
for _, ns := range namespaces {
55+
nsMap[string(ns.Bytes())] = struct{}{}
56+
}
57+
58+
blobs, ok := l.byHeight[height]
59+
if !ok {
60+
return []*blob.Blob{}, nil
61+
}
62+
var out []*blob.Blob
63+
for _, b := range blobs {
64+
if _, ok := nsMap[string(b.Namespace().Bytes())]; ok {
65+
out = append(out, b)
66+
}
67+
}
68+
return out, nil
69+
}
70+
71+
func (l *LocalBlobAPI) GetProof(ctx context.Context, height uint64, namespace share.Namespace, commitment blob.Commitment) (*blob.Proof, error) {
72+
return &blob.Proof{}, nil
73+
}
74+
75+
func (l *LocalBlobAPI) Included(ctx context.Context, height uint64, namespace share.Namespace, proof *blob.Proof, commitment blob.Commitment) (bool, error) {
76+
return true, nil
77+
}

block/internal/submitting/da_submitter.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ import (
1313
"github.com/evstack/ev-node/block/internal/cache"
1414
"github.com/evstack/ev-node/block/internal/common"
1515
"github.com/evstack/ev-node/block/internal/da"
16-
coreda "github.com/evstack/ev-node/core/da"
16+
"github.com/evstack/ev-node/pkg/blob"
1717
"github.com/evstack/ev-node/pkg/config"
1818
pkgda "github.com/evstack/ev-node/pkg/da"
19+
datypes "github.com/evstack/ev-node/pkg/da/types"
1920
"github.com/evstack/ev-node/pkg/genesis"
2021
"github.com/evstack/ev-node/pkg/rpc/server"
2122
"github.com/evstack/ev-node/pkg/signer"
@@ -119,7 +120,21 @@ func NewDASubmitter(
119120

120121
if config.RPC.EnableDAVisualization {
121122
visualizerLogger := logger.With().Str("component", "da_visualization").Logger()
122-
server.SetDAVisualizationServer(server.NewDAVisualizationServer(client.GetDA(), visualizerLogger, config.Node.Aggregator))
123+
server.SetDAVisualizationServer(
124+
server.NewDAVisualizationServer(
125+
func(ctx context.Context, id []byte, ns []byte) ([]datypes.Blob, error) {
126+
// minimal fetch: derive height from ID and use namespace provided
127+
height, _ := blob.SplitID(id)
128+
res := client.Retrieve(ctx, height, ns)
129+
if res.Code != datypes.StatusSuccess || len(res.Data) == 0 {
130+
return nil, fmt.Errorf("blob not found")
131+
}
132+
return res.Data, nil
133+
},
134+
visualizerLogger,
135+
config.Node.Aggregator,
136+
),
137+
)
123138
}
124139

125140
// Use NoOp metrics if nil to avoid nil checks throughout the code
@@ -184,7 +199,7 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager) er
184199
}
185200
return proto.Marshal(headerPb)
186201
},
187-
func(submitted []*types.SignedHeader, res *coreda.ResultSubmit) {
202+
func(submitted []*types.SignedHeader, res *datypes.ResultSubmit) {
188203
for _, header := range submitted {
189204
cache.SetHeaderDAIncluded(header.Hash().String(), res.Height, header.Height())
190205
}
@@ -227,7 +242,7 @@ func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signe
227242
func(signedData *types.SignedData) ([]byte, error) {
228243
return signedData.MarshalBinary()
229244
},
230-
func(submitted []*types.SignedData, res *coreda.ResultSubmit) {
245+
func(submitted []*types.SignedData, res *datypes.ResultSubmit) {
231246
for _, sd := range submitted {
232247
cache.SetDataDAIncluded(sd.Data.DACommitment().String(), res.Height, sd.Height())
233248
}
@@ -343,7 +358,7 @@ func submitToDA[T any](
343358
ctx context.Context,
344359
items []T,
345360
marshalFn func(T) ([]byte, error),
346-
postSubmit func([]T, *coreda.ResultSubmit),
361+
postSubmit func([]T, *datypes.ResultSubmit),
347362
itemType string,
348363
namespace []byte,
349364
options []byte,
@@ -406,7 +421,7 @@ func submitToDA[T any](
406421

407422
// Perform submission
408423
start := time.Now()
409-
res := s.client.Submit(submitCtx, marshaled, -1, namespace, mergedOptions)
424+
res := s.client.Submit(submitCtx, marshaled, namespace, mergedOptions)
410425
s.logger.Debug().Int("attempts", rs.Attempt).Dur("elapsed", time.Since(start)).Uint64("code", uint64(res.Code)).Msg("got SubmitWithHelpers response from celestia")
411426

412427
// Record submission result for observability
@@ -415,7 +430,7 @@ func submitToDA[T any](
415430
}
416431

417432
switch res.Code {
418-
case coreda.StatusSuccess:
433+
case datypes.StatusSuccess:
419434
submitted := items[:res.SubmittedCount]
420435
postSubmit(submitted, &res)
421436
s.logger.Info().Str("itemType", itemType).Uint64("count", res.SubmittedCount).Msg("successfully submitted items to DA layer")
@@ -436,7 +451,7 @@ func submitToDA[T any](
436451
s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn()))
437452
}
438453

439-
case coreda.StatusTooBig:
454+
case datypes.StatusTooBig:
440455
// Record failure metric
441456
s.recordFailure(common.DASubmitterFailureReasonTooBig)
442457
// Iteratively halve until it fits or single-item too big
@@ -460,19 +475,19 @@ func submitToDA[T any](
460475
s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn()))
461476
}
462477

463-
case coreda.StatusNotIncludedInBlock:
478+
case datypes.StatusNotIncludedInBlock:
464479
// Record failure metric
465480
s.recordFailure(common.DASubmitterFailureReasonNotIncludedInBlock)
466481
s.logger.Info().Dur("backoff", pol.MaxBackoff).Msg("retrying due to mempool state")
467482
rs.Next(reasonMempool, pol)
468483

469-
case coreda.StatusAlreadyInMempool:
484+
case datypes.StatusAlreadyInMempool:
470485
// Record failure metric
471486
s.recordFailure(common.DASubmitterFailureReasonAlreadyInMempool)
472487
s.logger.Info().Dur("backoff", pol.MaxBackoff).Msg("retrying due to mempool state")
473488
rs.Next(reasonMempool, pol)
474489

475-
case coreda.StatusContextCanceled:
490+
case datypes.StatusContextCanceled:
476491
// Record failure metric
477492
s.recordFailure(common.DASubmitterFailureReasonContextCanceled)
478493
s.logger.Info().Msg("DA layer submission canceled due to context cancellation")

block/internal/submitting/da_submitter_integration_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
//go:build !ignore
2+
// +build !ignore
3+
14
package submitting
25

36
import (
@@ -16,7 +19,6 @@ import (
1619
"github.com/evstack/ev-node/block/internal/cache"
1720
"github.com/evstack/ev-node/block/internal/common"
1821
"github.com/evstack/ev-node/block/internal/da"
19-
coreda "github.com/evstack/ev-node/core/da"
2022
"github.com/evstack/ev-node/pkg/config"
2123
"github.com/evstack/ev-node/pkg/genesis"
2224
"github.com/evstack/ev-node/pkg/signer/noop"
@@ -83,15 +85,13 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted(
8385
require.NoError(t, batch2.SetHeight(2))
8486
require.NoError(t, batch2.Commit())
8587

86-
// Dummy DA
87-
dummyDA := coreda.NewDummyDA(10_000_000, 10*time.Millisecond)
88-
89-
// Create DA submitter
88+
// Create DA submitter with local blob API
9089
daClient := da.NewClient(da.Config{
91-
DA: dummyDA,
92-
Logger: zerolog.Nop(),
93-
Namespace: cfg.DA.Namespace,
94-
DataNamespace: cfg.DA.DataNamespace,
90+
BlobAPI: da.NewLocalBlobAPI(common.DefaultMaxBlobSize),
91+
Logger: zerolog.Nop(),
92+
Namespace: cfg.DA.Namespace,
93+
DataNamespace: cfg.DA.DataNamespace,
94+
DefaultTimeout: 10 * time.Second,
9595
})
9696
daSubmitter := NewDASubmitter(daClient, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop())
9797

0 commit comments

Comments
 (0)