diff --git a/runner/benchmark/definition.go b/runner/benchmark/definition.go index 0b869747..dce60985 100644 --- a/runner/benchmark/definition.go +++ b/runner/benchmark/definition.go @@ -99,6 +99,18 @@ type DatadirConfig struct { Validator *string `yaml:"validator"` } +// ReplayConfig specifies configuration for replaying transactions from an +// external node instead of generating synthetic transactions. +type ReplayConfig struct { + // SourceRPCURL is the RPC endpoint of the node to fetch transactions from + SourceRPCURL string `yaml:"source_rpc_url"` + + // StartBlock is the first block to replay transactions from. + // If not specified (0), it will be automatically detected from the + // snapshot's head block + 1. + StartBlock uint64 `yaml:"start_block,omitempty"` +} + // TestDefinition is the user-facing YAML configuration for specifying a // matrix of benchmark runs. type TestDefinition struct { @@ -108,6 +120,7 @@ type TestDefinition struct { Tags *map[string]string `yaml:"tags"` Variables []Param `yaml:"variables"` ProofProgram *ProofProgramOptions `yaml:"proof_program"` + Replay *ReplayConfig `yaml:"replay"` } func (bc *TestDefinition) Check() error { diff --git a/runner/benchmark/matrix.go b/runner/benchmark/matrix.go index bfdc8d6c..569c1e43 100644 --- a/runner/benchmark/matrix.go +++ b/runner/benchmark/matrix.go @@ -17,6 +17,7 @@ type TestPlan struct { Snapshot *SnapshotDefinition ProofProgram *ProofProgramOptions Thresholds *ThresholdConfig + Replay *ReplayConfig } func NewTestPlanFromConfig(c TestDefinition, testFileName string, config *BenchmarkConfig) (*TestPlan, error) { @@ -42,6 +43,7 @@ func NewTestPlanFromConfig(c TestDefinition, testFileName string, config *Benchm Snapshot: c.Snapshot, ProofProgram: proofProgram, Thresholds: c.Metrics, + Replay: c.Replay, }, nil } diff --git a/runner/network/consensus/client.go b/runner/network/consensus/client.go index aa538135..ef449990 100644 --- a/runner/network/consensus/client.go +++ b/runner/network/consensus/client.go @@ -21,6 +21,10 @@ type ConsensusClientOptions struct { GasLimit uint64 // GasLimitSetup is the gas limit for the setup payload GasLimitSetup uint64 + // AllowTxFailures allows transactions to fail without stopping the benchmark. + // When true, failed transactions are logged as warnings instead of errors. + // Useful for replay mode where some transactions may fail due to state differences. + AllowTxFailures bool } // BaseConsensusClient contains common functionality shared between different consensus client implementations. diff --git a/runner/network/consensus/validator_consensus.go b/runner/network/consensus/validator_consensus.go index ce9298ff..9b72f512 100644 --- a/runner/network/consensus/validator_consensus.go +++ b/runner/network/consensus/validator_consensus.go @@ -64,18 +64,18 @@ func (f *SyncingConsensusClient) propose(ctx context.Context, payload *engine.Ex } // Start starts the fake consensus client. -func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, firstTestBlock uint64) error { +func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, lastSetupBlock uint64) error { f.log.Info("Starting sync benchmark", "num_payloads", len(payloads)) m := metrics.NewBlockMetrics() for i := 0; i < len(payloads); i++ { - m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(firstTestBlock)))) + m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(lastSetupBlock)))) f.log.Info("Proposing payload", "payload_index", i) err := f.propose(ctx, &payloads[i], m) if err != nil { return err } - if payloads[i].Number >= firstTestBlock { + if payloads[i].Number > lastSetupBlock { err = metricsCollector.Collect(ctx, m) if err != nil { f.log.Error("Failed to collect metrics", "error", err) diff --git a/runner/network/mempool/replay_mempool.go b/runner/network/mempool/replay_mempool.go new file mode 100644 index 00000000..cbd86e59 --- /dev/null +++ b/runner/network/mempool/replay_mempool.go @@ -0,0 +1,136 @@ +package mempool + +import ( + "context" + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" +) + +// ReplayMempool fetches transactions from an external node and replays them. +// It iterates through blocks from a source node and provides transactions +// block-by-block for the benchmark to replay. +type ReplayMempool struct { + log log.Logger + client *ethclient.Client + + lock sync.Mutex + + // startBlock is the first block to fetch transactions from + startBlock uint64 + + // currentBlock tracks which block we're fetching next + currentBlock uint64 + + // chainID for transaction signing validation + chainID *big.Int + + // addressNonce tracks the latest nonce for each address + addressNonce map[common.Address]uint64 +} + +// NewReplayMempool creates a new ReplayMempool that fetches transactions +// from the given RPC endpoint starting from the specified block. +func NewReplayMempool(log log.Logger, rpcURL string, startBlock uint64, chainID *big.Int) (*ReplayMempool, error) { + client, err := ethclient.Dial(rpcURL) + if err != nil { + return nil, err + } + + return &ReplayMempool{ + log: log, + client: client, + startBlock: startBlock, + currentBlock: startBlock, + chainID: chainID, + addressNonce: make(map[common.Address]uint64), + }, nil +} + +// AddTransactions is a no-op for ReplayMempool since transactions come from the source node. +func (m *ReplayMempool) AddTransactions(_ []*types.Transaction) { + // No-op: transactions are fetched from the source node, not added manually +} + +// NextBlock fetches the next block from the source node and returns its transactions. +// Returns (mempoolTxs, sequencerTxs) where: +// - mempoolTxs: regular transactions to be sent via eth_sendRawTransaction +// - sequencerTxs: deposit transactions to be included in payload attributes +func (m *ReplayMempool) NextBlock() ([][]byte, [][]byte) { + m.lock.Lock() + defer m.lock.Unlock() + + ctx := context.Background() + + block, err := m.client.BlockByNumber(ctx, big.NewInt(int64(m.currentBlock))) + if err != nil { + m.log.Warn("Failed to fetch block", "block", m.currentBlock, "error", err) + return nil, nil + } + + m.log.Info("Fetched block for replay", + "block", m.currentBlock, + "txs", len(block.Transactions()), + "gas_used", block.GasUsed(), + ) + + m.currentBlock++ + + mempoolTxs := make([][]byte, 0) + sequencerTxs := make([][]byte, 0) + + for _, tx := range block.Transactions() { + // Track nonces for GetTransactionCount + from, err := types.Sender(types.NewIsthmusSigner(m.chainID), tx) + if err != nil { + // Try with London signer for older transactions + from, err = types.Sender(types.NewLondonSigner(m.chainID), tx) + if err != nil { + m.log.Warn("Failed to get sender", "tx", tx.Hash(), "error", err) + continue + } + } + m.addressNonce[from] = tx.Nonce() + + txBytes, err := tx.MarshalBinary() + if err != nil { + m.log.Warn("Failed to marshal transaction", "tx", tx.Hash(), "error", err) + continue + } + + // Deposit transactions go to sequencer, others go to mempool + if tx.Type() == types.DepositTxType { + sequencerTxs = append(sequencerTxs, txBytes) + } else { + mempoolTxs = append(mempoolTxs, txBytes) + } + } + + return mempoolTxs, sequencerTxs +} + +// GetTransactionCount returns the latest nonce for an address. +func (m *ReplayMempool) GetTransactionCount(address common.Address) uint64 { + m.lock.Lock() + defer m.lock.Unlock() + return m.addressNonce[address] +} + +// CurrentBlock returns the current block number being replayed. +func (m *ReplayMempool) CurrentBlock() uint64 { + m.lock.Lock() + defer m.lock.Unlock() + return m.currentBlock +} + +// Close closes the underlying RPC client connection. +func (m *ReplayMempool) Close() { + m.client.Close() +} + +var _ FakeMempool = &ReplayMempool{} + diff --git a/runner/network/network_benchmark.go b/runner/network/network_benchmark.go index 15bc8443..3cda71ff 100644 --- a/runner/network/network_benchmark.go +++ b/runner/network/network_benchmark.go @@ -45,10 +45,11 @@ type NetworkBenchmark struct { transactionPayload payload.Definition ports portmanager.PortManager + replayConfig *benchmark.ReplayConfig } // NewNetworkBenchmark creates a new network benchmark and initializes the payload worker and consensus client -func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequencerOptions *config.InternalClientOptions, validatorOptions *config.InternalClientOptions, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, ports portmanager.PortManager) (*NetworkBenchmark, error) { +func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequencerOptions *config.InternalClientOptions, validatorOptions *config.InternalClientOptions, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, ports portmanager.PortManager, replayConfig *benchmark.ReplayConfig) (*NetworkBenchmark, error) { return &NetworkBenchmark{ log: log, sequencerOptions: sequencerOptions, @@ -57,6 +58,7 @@ func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequence proofConfig: proofConfig, transactionPayload: transactionPayload, ports: ports, + replayConfig: replayConfig, }, nil } @@ -107,15 +109,35 @@ func (nb *NetworkBenchmark) benchmarkSequencer(ctx context.Context, l1Chain *l1C } }() - benchmark := newSequencerBenchmark(nb.log, *nb.testConfig, sequencerClient, l1Chain, nb.transactionPayload) - executionData, lastBlock, err := benchmark.Run(ctx, metricsCollector) + var executionData []engine.ExecutableData + var lastBlock uint64 + + // Use replay benchmark if replay config is provided + if nb.replayConfig != nil { + nb.log.Info("Using replay sequencer benchmark", + "source_rpc", nb.replayConfig.SourceRPCURL, + "start_block", nb.replayConfig.StartBlock, + ) + replayBenchmark := NewReplaySequencerBenchmark( + nb.log, + *nb.testConfig, + sequencerClient, + l1Chain, + nb.replayConfig.SourceRPCURL, + nb.replayConfig.StartBlock, + ) + executionData, lastBlock, err = replayBenchmark.Run(ctx, metricsCollector) + } else { + benchmark := newSequencerBenchmark(nb.log, *nb.testConfig, sequencerClient, l1Chain, nb.transactionPayload) + executionData, lastBlock, err = benchmark.Run(ctx, metricsCollector) + } if err != nil { sequencerClient.Stop() - return nil, 0, nil, fmt.Errorf("failed to run sequencer benchmark: %w", err) + return nil, 0, nil, err } - return executionData, lastBlock, sequencerClient, nil + return executionData, lastBlock, sequencerClient, err } func (nb *NetworkBenchmark) benchmarkValidator(ctx context.Context, payloads []engine.ExecutableData, lastSetupBlock uint64, l1Chain *l1Chain, sequencerClient types.ExecutionClient) error { diff --git a/runner/network/replay_sequencer_benchmark.go b/runner/network/replay_sequencer_benchmark.go new file mode 100644 index 00000000..7295d21f --- /dev/null +++ b/runner/network/replay_sequencer_benchmark.go @@ -0,0 +1,172 @@ +package network + +import ( + "context" + "time" + + "github.com/base/base-bench/runner/clients/types" + "github.com/base/base-bench/runner/metrics" + "github.com/base/base-bench/runner/network/consensus" + "github.com/base/base-bench/runner/network/mempool" + "github.com/base/base-bench/runner/network/proofprogram/fakel1" + benchtypes "github.com/base/base-bench/runner/network/types" + "github.com/ethereum/go-ethereum/beacon/engine" + "github.com/ethereum/go-ethereum/log" + "github.com/pkg/errors" +) + +// ReplaySequencerBenchmark is a sequencer benchmark that replays transactions +// from an external node. It has no setup phase - it directly pulls transactions +// from the source node and builds blocks with them. +type ReplaySequencerBenchmark struct { + log log.Logger + sequencerClient types.ExecutionClient + config benchtypes.TestConfig + l1Chain *l1Chain + + // sourceRPCURL is the RPC endpoint of the node to fetch transactions from + sourceRPCURL string + + // startBlock is the first block to replay transactions from + startBlock uint64 +} + +// NewReplaySequencerBenchmark creates a new replay sequencer benchmark. +func NewReplaySequencerBenchmark( + log log.Logger, + config benchtypes.TestConfig, + sequencerClient types.ExecutionClient, + l1Chain *l1Chain, + sourceRPCURL string, + startBlock uint64, +) *ReplaySequencerBenchmark { + return &ReplaySequencerBenchmark{ + log: log, + config: config, + sequencerClient: sequencerClient, + l1Chain: l1Chain, + sourceRPCURL: sourceRPCURL, + startBlock: startBlock, + } +} + +// Run executes the replay benchmark. It fetches transactions from the source +// node block-by-block and replays them on the benchmark node. +func (rb *ReplaySequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.Collector) ([]engine.ExecutableData, uint64, error) { + params := rb.config.Params + sequencerClient := rb.sequencerClient + + // Get head block from the snapshot to determine starting point + headBlockHeader, err := sequencerClient.Client().HeaderByNumber(ctx, nil) + if err != nil { + rb.log.Warn("Failed to get head block header", "error", err) + return nil, 0, err + } + headBlockHash := headBlockHeader.Hash() + headBlockNumber := headBlockHeader.Number.Uint64() + + // Auto-detect start block from snapshot if not specified + startBlock := rb.startBlock + if startBlock == 0 { + // Start from the next block after the snapshot's head + startBlock = headBlockNumber + 1 + rb.log.Info("Auto-detected start block from snapshot", + "snapshot_head", headBlockNumber, + "start_block", startBlock, + ) + } + + // Create replay mempool that fetches from source node + replayMempool, err := mempool.NewReplayMempool( + rb.log, + rb.sourceRPCURL, + startBlock, + rb.config.Genesis.Config.ChainID, + ) + if err != nil { + return nil, 0, errors.Wrap(err, "failed to create replay mempool") + } + defer replayMempool.Close() + + benchmarkCtx, benchmarkCancel := context.WithCancel(ctx) + defer benchmarkCancel() + + errChan := make(chan error) + payloadResult := make(chan []engine.ExecutableData) + + var l1Chain fakel1.L1Chain + if rb.l1Chain != nil { + l1Chain = rb.l1Chain.chain + } + + go func() { + consensusClient := consensus.NewSequencerConsensusClient( + rb.log, + sequencerClient.Client(), + sequencerClient.AuthClient(), + replayMempool, + consensus.ConsensusClientOptions{ + BlockTime: params.BlockTime, + GasLimit: params.GasLimit, + // No special setup gas limit needed since we're replaying real txs + GasLimitSetup: params.GasLimit, + // Allow tx failures for replay since state may differ from source chain + AllowTxFailures: true, + }, + headBlockHash, + headBlockNumber, + l1Chain, + rb.config.BatcherAddr(), + ) + + payloads := make([]engine.ExecutableData, 0) + blockMetrics := metrics.NewBlockMetrics() + + // Directly run benchmark blocks without setup phase + for i := 0; i < params.NumBlocks; i++ { + blockMetrics.SetBlockNumber(uint64(i) + 1) + + // Propose will fetch transactions from the replay mempool + payload, err := consensusClient.Propose(benchmarkCtx, blockMetrics, false) + if err != nil { + errChan <- err + return + } + + if payload == nil { + errChan <- errors.New("received nil payload from consensus client") + return + } + + rb.log.Info("Built replay block", + "block", payload.Number, + "txs", len(payload.Transactions), + "gas_used", payload.GasUsed, + ) + + time.Sleep(1000 * time.Millisecond) + + err = metricsCollector.Collect(benchmarkCtx, blockMetrics) + if err != nil { + rb.log.Error("Failed to collect metrics", "error", err) + } + payloads = append(payloads, *payload) + } + + err = consensusClient.Stop(benchmarkCtx) + if err != nil { + rb.log.Warn("Failed to stop consensus client", "error", err) + } + + payloadResult <- payloads + }() + + select { + case err := <-errChan: + return nil, 0, err + case payloads := <-payloadResult: + // No setup blocks for replay, so lastSetupBlock is the head block number + return payloads, headBlockNumber, nil + } +} + diff --git a/runner/service.go b/runner/service.go index 7be227f9..03dc323d 100644 --- a/runner/service.go +++ b/runner/service.go @@ -320,7 +320,7 @@ func (s *service) setupBlobsDir(workingDir string) error { return nil } -func (s *service) runTest(ctx context.Context, params types.RunParams, workingDir string, outputDir string, snapshotConfig *benchmark.SnapshotDefinition, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, datadirsConfig *benchmark.DatadirConfig) (*benchmark.RunResult, error) { +func (s *service) runTest(ctx context.Context, params types.RunParams, workingDir string, outputDir string, snapshotConfig *benchmark.SnapshotDefinition, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, datadirsConfig *benchmark.DatadirConfig, replayConfig *benchmark.ReplayConfig) (*benchmark.RunResult, error) { s.log.Info(fmt.Sprintf("Running benchmark with params: %+v", params)) @@ -384,7 +384,7 @@ func (s *service) runTest(ctx context.Context, params types.RunParams, workingDi } // Run benchmark - benchmark, err := network.NewNetworkBenchmark(config, s.log, sequencerOptions, validatorOptions, proofConfig, transactionPayload, s.portState) + benchmark, err := network.NewNetworkBenchmark(config, s.log, sequencerOptions, validatorOptions, proofConfig, transactionPayload, s.portState, replayConfig) if err != nil { return nil, errors.Wrap(err, "failed to create network benchmark") } @@ -566,7 +566,7 @@ outerLoop: return errors.Wrap(err, "failed to create output directory") } - metricSummary, err := s.runTest(ctx, c.Params, s.config.DataDir(), outputDir, testPlan.Snapshot, testPlan.ProofProgram, transactionPayloads[c.Params.PayloadID], testPlan.Datadir) + metricSummary, err := s.runTest(ctx, c.Params, s.config.DataDir(), outputDir, testPlan.Snapshot, testPlan.ProofProgram, transactionPayloads[c.Params.PayloadID], testPlan.Datadir, testPlan.Replay) if err != nil { log.Error("Failed to run test", "err", err) metricSummary = &benchmark.RunResult{