Skip to content

Commit cf5d8b6

Browse files
Manav-AggarwalConnor O'Hara
andauthored
Enforce centralized sequencer in p2p block gossiping (#1418)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. --> ## Overview Resolves #1417, resolves #1404 <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. --> ## Checklist <!-- Please complete the checklist to ensure that the PR is ready to be reviewed. IMPORTANT: PRs should be left in Draft until the below checklist is completed. --> - [x] New and updated code has appropriate documentation - [x] New and updated code has new and/or updated testing - [x] Required CI checks are passing - [ ] Visual proof for any user facing features like CLI or documentation updates - [x] Linked issues closed with keywords <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced block validation to reject certain blocks early in the process. - Implemented centralized sequencer validation to ensure block sequencing integrity. - **Bug Fixes** - Adjusted the initialization of nodes in test suites to align with updated genesis document and signing key configurations. - Updated error handling in block management to provide more informative feedback. - **Refactor** - Streamlined block manager initialization with improved error handling. - Updated test cases to reflect new validator set and signing key initialization methods. - Refined node test initialization to include signing key handling and error propagation. - **Documentation** - Added new error messages to improve clarity on validation failures. - **Tests** - Enhanced integration and unit tests to cover new validation checks and error conditions. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Connor O'Hara <connor@switchboard.xyz>
1 parent 2c69346 commit cf5d8b6

10 files changed

Lines changed: 302 additions & 229 deletions

block/manager.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,6 @@ func NewManager(
133133
s.DAHeight = conf.DAStartHeight
134134
}
135135

136-
proposerAddress, err := getAddress(proposerKey)
137-
if err != nil {
138-
return nil, err
139-
}
140-
141136
if conf.DABlockTime == 0 {
142137
logger.Info("Using default DA block time", "DABlockTime", defaultDABlockTime)
143138
conf.DABlockTime = defaultDABlockTime
@@ -148,6 +143,11 @@ func NewManager(
148143
conf.BlockTime = defaultBlockTime
149144
}
150145

146+
proposerAddress, err := getAddress(proposerKey)
147+
if err != nil {
148+
return nil, err
149+
}
150+
151151
exec := state.NewBlockExecutor(proposerAddress, genesis.ChainID, mempool, proxyApp, eventBus, logger)
152152
if s.LastBlockHeight+1 == uint64(genesis.InitialHeight) {
153153
res, err := exec.InitChain(genesis)
@@ -465,6 +465,10 @@ func (m *Manager) BlockStoreRetrieveLoop(ctx context.Context) {
465465
return
466466
default:
467467
}
468+
// early validation to reject junk blocks
469+
if !m.isUsingExpectedCentralizedSequencer(block) {
470+
continue
471+
}
468472
m.logger.Debug("block retrieved from p2p block sync", "blockHeight", block.Height(), "daHeight", daHeight)
469473
m.blockInCh <- NewBlockEvent{block, daHeight}
470474
}
@@ -547,12 +551,8 @@ func (m *Manager) processNextDABlock(ctx context.Context) error {
547551
}
548552
m.logger.Debug("retrieved potential blocks", "n", len(blockResp.Blocks), "daHeight", daHeight)
549553
for _, block := range blockResp.Blocks {
550-
// received block is not from the expected centralized sequencer
551-
if !bytes.Equal(block.SignedHeader.ProposerAddress, m.genesis.Validators[0].Address.Bytes()) {
552-
continue
553-
}
554554
// early validation to reject junk blocks
555-
if block.ValidateBasic() != nil {
555+
if !m.isUsingExpectedCentralizedSequencer(block) {
556556
continue
557557
}
558558
blockHash := block.Hash().String()
@@ -588,6 +588,10 @@ func (m *Manager) processNextDABlock(ctx context.Context) error {
588588
return err
589589
}
590590

591+
func (m *Manager) isUsingExpectedCentralizedSequencer(block *types.Block) bool {
592+
return bytes.Equal(block.SignedHeader.ProposerAddress, m.genesis.Validators[0].Address.Bytes()) && block.ValidateBasic() == nil
593+
}
594+
591595
func (m *Manager) fetchBlock(ctx context.Context, daHeight uint64) (da.ResultRetrieveBlocks, error) {
592596
var err error
593597
blockRes := m.dalc.RetrieveBlocks(ctx, daHeight)

block/manager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ import (
2020
)
2121

2222
func TestInitialState(t *testing.T) {
23-
genesisValidators, _ := types.GetGenesisValidatorSetWithSigner()
23+
genesisDoc, _ := types.GetGenesisWithPrivkey()
2424
genesis := &cmtypes.GenesisDoc{
2525
ChainID: "genesis id",
2626
InitialHeight: 100,
27-
Validators: genesisValidators,
27+
Validators: genesisDoc.Validators,
2828
}
2929
sampleState := types.State{
3030
ChainID: "state id",

node/full_client_test.go

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ func getRPC(t *testing.T) (*mocks.Application, *FullClient) {
6565
app.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
6666
key, _, _ := crypto.GenerateEd25519Key(crand.Reader)
6767
ctx := context.Background()
68-
genesisValidators, signingKey := types.GetGenesisValidatorSetWithSigner()
68+
genesisDoc, genesisValidatorKey := types.GetGenesisWithPrivkey()
69+
signingKey, err := types.PrivKeyToSigningKey(genesisValidatorKey)
70+
require.NoError(err)
6971
node, err := newFullNode(
7072
ctx,
7173
config.NodeConfig{
@@ -74,10 +76,7 @@ func getRPC(t *testing.T) (*mocks.Application, *FullClient) {
7476
key,
7577
signingKey,
7678
proxy.NewLocalClientCreator(app),
77-
&cmtypes.GenesisDoc{
78-
ChainID: "test",
79-
Validators: genesisValidators,
80-
},
79+
genesisDoc,
8180
log.TestingLogger(),
8281
)
8382
require.NoError(err)
@@ -492,7 +491,9 @@ func TestTx(t *testing.T) {
492491
mockApp.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse).Maybe()
493492
mockApp.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
494493
key, _, _ := crypto.GenerateEd25519Key(crand.Reader)
495-
genesisValidators, signingKey := types.GetGenesisValidatorSetWithSigner()
494+
genesisDoc, genesisValidatorKey := types.GetGenesisWithPrivkey()
495+
signingKey, err := types.PrivKeyToSigningKey(genesisValidatorKey)
496+
require.NoError(err)
496497
ctx, cancel := context.WithCancel(context.Background())
497498
defer cancel()
498499
node, err := newFullNode(ctx, config.NodeConfig{
@@ -502,7 +503,7 @@ func TestTx(t *testing.T) {
502503
BlockTime: 1 * time.Second, // blocks must be at least 1 sec apart for adjacent headers to get verified correctly
503504
}},
504505
key, signingKey, proxy.NewLocalClientCreator(mockApp),
505-
&cmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators},
506+
genesisDoc,
506507
test.NewFileLogger(t))
507508
require.NoError(err)
508509
require.NotNil(node)
@@ -740,7 +741,9 @@ func TestMempool2Nodes(t *testing.T) {
740741
assert := assert.New(t)
741742
require := require.New(t)
742743

743-
genesisValidators, signingKey1 := types.GetGenesisValidatorSetWithSigner()
744+
genesisDoc, genesisValidatorKey := types.GetGenesisWithPrivkey()
745+
signingKey1, err := types.PrivKeyToSigningKey(genesisValidatorKey)
746+
require.NoError(err)
744747

745748
app := &mocks.Application{}
746749
app.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
@@ -769,7 +772,7 @@ func TestMempool2Nodes(t *testing.T) {
769772
ListenAddress: "/ip4/127.0.0.1/tcp/9001",
770773
},
771774
BlockManagerConfig: getBMConfig(),
772-
}, key1, signingKey1, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger())
775+
}, key1, signingKey1, proxy.NewLocalClientCreator(app), genesisDoc, log.TestingLogger())
773776
require.NoError(err)
774777
require.NotNil(node1)
775778

@@ -779,7 +782,7 @@ func TestMempool2Nodes(t *testing.T) {
779782
ListenAddress: "/ip4/127.0.0.1/tcp/9002",
780783
Seeds: "/ip4/127.0.0.1/tcp/9001/p2p/" + id1.Pretty(),
781784
},
782-
}, key2, signingKey2, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger())
785+
}, key2, signingKey2, proxy.NewLocalClientCreator(app), genesisDoc, log.TestingLogger())
783786
require.NoError(err)
784787
require.NotNil(node2)
785788

@@ -834,8 +837,10 @@ func TestStatus(t *testing.T) {
834837
app.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse).Maybe()
835838
app.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
836839
key, _, _ := crypto.GenerateEd25519Key(crand.Reader)
837-
genesisValidators, signingKey := types.GetGenesisValidatorSetWithSigner()
838-
pubKey := genesisValidators[0].PubKey
840+
genesisDoc, genesisValidatorKey := types.GetGenesisWithPrivkey()
841+
signingKey, err := types.PrivKeyToSigningKey(genesisValidatorKey)
842+
require.NoError(err)
843+
pubKey := genesisDoc.Validators[0].PubKey
839844

840845
node, err := newFullNode(
841846
context.Background(),
@@ -852,10 +857,7 @@ func TestStatus(t *testing.T) {
852857
key,
853858
signingKey,
854859
proxy.NewLocalClientCreator(app),
855-
&cmtypes.GenesisDoc{
856-
ChainID: "test",
857-
Validators: genesisValidators,
858-
},
860+
genesisDoc,
859861
test.NewFileLogger(t),
860862
)
861863
require.NoError(err)
@@ -890,8 +892,8 @@ func TestStatus(t *testing.T) {
890892
assert.Equal(int64(2), resp.SyncInfo.LatestBlockHeight)
891893

892894
// Changed the RPC method to get this from the genesis.
893-
assert.Equal(genesisValidators[0].Address, resp.ValidatorInfo.Address)
894-
assert.Equal(genesisValidators[0].PubKey, resp.ValidatorInfo.PubKey)
895+
assert.Equal(genesisDoc.Validators[0].Address, resp.ValidatorInfo.Address)
896+
assert.Equal(genesisDoc.Validators[0].PubKey, resp.ValidatorInfo.PubKey)
895897
// hardcode to 1, shouldn't matter because it's a centralized sequencer
896898
assert.Equal(int64(1), resp.ValidatorInfo.VotingPower)
897899

@@ -945,7 +947,9 @@ func TestFutureGenesisTime(t *testing.T) {
945947
mockApp.On("Commit", mock.Anything, mock.Anything).Return(&abci.ResponseCommit{}, nil)
946948
mockApp.On("CheckTx", mock.Anything, mock.Anything).Return(&abci.ResponseCheckTx{}, nil)
947949
key, _, _ := crypto.GenerateEd25519Key(crand.Reader)
948-
genesisValidators, signingKey := types.GetGenesisValidatorSetWithSigner()
950+
genesisDoc, genesisValidatorKey := types.GetGenesisWithPrivkey()
951+
signingKey, err := types.PrivKeyToSigningKey(genesisValidatorKey)
952+
require.NoError(err)
949953
genesisTime := time.Now().Local().Add(time.Second * time.Duration(1))
950954
ctx, cancel := context.WithCancel(context.Background())
951955
defer cancel()
@@ -961,7 +965,7 @@ func TestFutureGenesisTime(t *testing.T) {
961965
ChainID: "test",
962966
InitialHeight: 1,
963967
GenesisTime: genesisTime,
964-
Validators: genesisValidators,
968+
Validators: genesisDoc.Validators,
965969
},
966970
test.NewFileLogger(t))
967971
require.NoError(err)

node/full_node_integration_test.go

Lines changed: 8 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package node
22

33
import (
4-
"bytes"
54
"context"
65
"crypto/rand"
76
"errors"
@@ -13,7 +12,6 @@ import (
1312
"time"
1413

1514
"github.com/cometbft/cometbft/crypto/ed25519"
16-
"github.com/cometbft/cometbft/p2p"
1715
"github.com/stretchr/testify/assert"
1816

1917
abci "github.com/cometbft/cometbft/abci/types"
@@ -26,7 +24,6 @@ import (
2624
"github.com/stretchr/testify/require"
2725

2826
"github.com/rollkit/rollkit/config"
29-
"github.com/rollkit/rollkit/da"
3027
test "github.com/rollkit/rollkit/test/log"
3128
"github.com/rollkit/rollkit/test/mocks"
3229
"github.com/rollkit/rollkit/types"
@@ -40,78 +37,6 @@ func prepareProposalResponse(_ context.Context, req *abci.RequestPrepareProposal
4037
}, nil
4138
}
4239

43-
func TestCentralizedSequencer(t *testing.T) {
44-
require := require.New(t)
45-
assert := assert.New(t)
46-
genDoc, privkey := types.GetGenesisWithPrivkey()
47-
genDoc.AppHash = make([]byte, 32)
48-
nodeKey := &p2p.NodeKey{
49-
PrivKey: privkey,
50-
}
51-
signingKey, err := types.GetNodeKey(nodeKey)
52-
require.NoError(err)
53-
54-
app := &mocks.Application{}
55-
app.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
56-
app.On("CheckTx", mock.Anything, mock.Anything).Return(&abci.ResponseCheckTx{}, nil)
57-
app.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse).Maybe()
58-
app.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
59-
app.On("FinalizeBlock", mock.Anything, mock.Anything).Return(finalizeBlockResponse)
60-
app.On("Commit", mock.Anything, mock.Anything).Return(&abci.ResponseCommit{}, nil)
61-
62-
vals := types.GetValidatorSetFromGenesis(genDoc)
63-
64-
dalc := getMockDA()
65-
66-
blockManagerConfig := config.BlockManagerConfig{
67-
BlockTime: 1 * time.Second,
68-
DAStartHeight: 1,
69-
DABlockTime: 1 * time.Second,
70-
}
71-
ctx, cancel := context.WithCancel(context.Background())
72-
defer cancel()
73-
node, err := newFullNode(ctx, config.NodeConfig{DAAddress: MockServerAddr, Aggregator: false, BlockManagerConfig: blockManagerConfig}, signingKey, signingKey, proxy.NewLocalClientCreator(app), genDoc, test.NewFileLogger(t))
74-
require.NoError(err)
75-
node.dalc = dalc
76-
node.blockManager.SetDALC(dalc)
77-
78-
err = node.Start()
79-
defer func() {
80-
assert.NoError(node.Stop())
81-
}()
82-
require.NoError(err)
83-
84-
lastState, err := node.Store.GetState()
85-
require.NoError(err)
86-
lastResults := lastState.LastResultsHash
87-
88-
validBlock, err := types.GetFirstBlock(privkey, &vals, lastResults, false, false)
89-
require.NoError(err)
90-
err = validBlock.ValidateBasic()
91-
require.NoError(err)
92-
junkProposerBlock, err := types.GetFirstBlock(privkey, &vals, lastResults, true, false)
93-
require.NoError(err)
94-
err = junkProposerBlock.ValidateBasic()
95-
require.Error(err)
96-
sigInvalidBlock, err := types.GetFirstBlock(privkey, &vals, lastResults, false, true)
97-
require.NoError(err)
98-
err = sigInvalidBlock.ValidateBasic()
99-
require.Error(err)
100-
submitResp := dalc.SubmitBlocks(ctx, []*types.Block{validBlock, junkProposerBlock, sigInvalidBlock})
101-
fmt.Println(submitResp)
102-
require.Equal(submitResp.Code, da.StatusSuccess)
103-
require.NoError(testutils.Retry(300, 100*time.Millisecond, func() error {
104-
block, err := node.Store.GetBlock(1)
105-
if err != nil {
106-
return err
107-
}
108-
if !bytes.Equal(block.Hash(), validBlock.Hash()) {
109-
return fmt.Errorf("unexpected block")
110-
}
111-
return nil
112-
}))
113-
}
114-
11540
func TestAggregatorMode(t *testing.T) {
11641
assert := assert.New(t)
11742
require := require.New(t)
@@ -125,13 +50,15 @@ func TestAggregatorMode(t *testing.T) {
12550
app.On("Commit", mock.Anything, mock.Anything).Return(&abci.ResponseCommit{}, nil)
12651

12752
key, _, _ := crypto.GenerateEd25519Key(rand.Reader)
128-
genesisValidators, signingKey := types.GetGenesisValidatorSetWithSigner()
53+
genesisDoc, genesisValidatorKey := types.GetGenesisWithPrivkey()
54+
signingKey, err := types.PrivKeyToSigningKey(genesisValidatorKey)
55+
require.NoError(err)
12956
blockManagerConfig := config.BlockManagerConfig{
13057
BlockTime: 1 * time.Second,
13158
}
13259
ctx, cancel := context.WithCancel(context.Background())
13360
defer cancel()
134-
node, err := newFullNode(ctx, config.NodeConfig{DAAddress: MockServerAddr, Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, signingKey, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger())
61+
node, err := newFullNode(ctx, config.NodeConfig{DAAddress: MockServerAddr, Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, signingKey, proxy.NewLocalClientCreator(app), genesisDoc, log.TestingLogger())
13562
require.NoError(err)
13663
require.NotNil(node)
13764

@@ -238,7 +165,9 @@ func TestLazyAggregator(t *testing.T) {
238165
app.On("Commit", mock.Anything, mock.Anything).Return(&abci.ResponseCommit{}, nil)
239166

240167
key, _, _ := crypto.GenerateEd25519Key(rand.Reader)
241-
genesisValidators, signingKey := types.GetGenesisValidatorSetWithSigner()
168+
genesisDoc, genesisValidatorKey := types.GetGenesisWithPrivkey()
169+
signingKey, err := types.PrivKeyToSigningKey(genesisValidatorKey)
170+
require.NoError(err)
242171
blockManagerConfig := config.BlockManagerConfig{
243172
// After the genesis header is published, the syncer is started
244173
// which takes little longer (due to initialization) and the syncer
@@ -257,7 +186,7 @@ func TestLazyAggregator(t *testing.T) {
257186
Aggregator: true,
258187
BlockManagerConfig: blockManagerConfig,
259188
LazyAggregator: true,
260-
}, key, signingKey, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger())
189+
}, key, signingKey, proxy.NewLocalClientCreator(app), genesisDoc, log.TestingLogger())
261190
assert.False(node.IsRunning())
262191
assert.NoError(err)
263192

0 commit comments

Comments
 (0)