Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions runner/benchmark/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ type DatadirConfig struct {
Validator *string `yaml:"validator"`
}

// ReplayConfig specifies configuration for replaying transactions from an
// external node instead of generating synthetic transactions.
type ReplayConfig struct {
// SourceRPCURL is the RPC endpoint of the node to fetch transactions from
SourceRPCURL string `yaml:"source_rpc_url"`

// StartBlock is the first block to replay transactions from.
// If not specified (0), it will be automatically detected from the
// snapshot's head block + 1.
StartBlock uint64 `yaml:"start_block,omitempty"`
}

// TestDefinition is the user-facing YAML configuration for specifying a
// matrix of benchmark runs.
type TestDefinition struct {
Expand All @@ -108,6 +120,7 @@ type TestDefinition struct {
Tags *map[string]string `yaml:"tags"`
Variables []Param `yaml:"variables"`
ProofProgram *ProofProgramOptions `yaml:"proof_program"`
Replay *ReplayConfig `yaml:"replay"`
}

func (bc *TestDefinition) Check() error {
Expand Down
2 changes: 2 additions & 0 deletions runner/benchmark/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type TestPlan struct {
Snapshot *SnapshotDefinition
ProofProgram *ProofProgramOptions
Thresholds *ThresholdConfig
Replay *ReplayConfig
}

func NewTestPlanFromConfig(c TestDefinition, testFileName string, config *BenchmarkConfig) (*TestPlan, error) {
Expand All @@ -42,6 +43,7 @@ func NewTestPlanFromConfig(c TestDefinition, testFileName string, config *Benchm
Snapshot: c.Snapshot,
ProofProgram: proofProgram,
Thresholds: c.Metrics,
Replay: c.Replay,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions runner/network/consensus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type ConsensusClientOptions struct {
GasLimit uint64
// GasLimitSetup is the gas limit for the setup payload
GasLimitSetup uint64
// AllowTxFailures allows transactions to fail without stopping the benchmark.
// When true, failed transactions are logged as warnings instead of errors.
// Useful for replay mode where some transactions may fail due to state differences.
AllowTxFailures bool
}

// BaseConsensusClient contains common functionality shared between different consensus client implementations.
Expand Down
6 changes: 3 additions & 3 deletions runner/network/consensus/validator_consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ func (f *SyncingConsensusClient) propose(ctx context.Context, payload *engine.Ex
}

// Start starts the fake consensus client.
func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, firstTestBlock uint64) error {
func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, lastSetupBlock uint64) error {
f.log.Info("Starting sync benchmark", "num_payloads", len(payloads))
m := metrics.NewBlockMetrics()
for i := 0; i < len(payloads); i++ {
m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(firstTestBlock))))
m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(lastSetupBlock))))
f.log.Info("Proposing payload", "payload_index", i)
err := f.propose(ctx, &payloads[i], m)
if err != nil {
return err
}

if payloads[i].Number >= firstTestBlock {
if payloads[i].Number > lastSetupBlock {
err = metricsCollector.Collect(ctx, m)
if err != nil {
f.log.Error("Failed to collect metrics", "error", err)
Expand Down
136 changes: 136 additions & 0 deletions runner/network/mempool/replay_mempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package mempool

import (
"context"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)

// ReplayMempool fetches transactions from an external node and replays them.
// It iterates through blocks from a source node and provides transactions
// block-by-block for the benchmark to replay.
type ReplayMempool struct {
log log.Logger
client *ethclient.Client

lock sync.Mutex

// startBlock is the first block to fetch transactions from
startBlock uint64

// currentBlock tracks which block we're fetching next
currentBlock uint64

// chainID for transaction signing validation
chainID *big.Int

// addressNonce tracks the latest nonce for each address
addressNonce map[common.Address]uint64
}

// NewReplayMempool creates a new ReplayMempool that fetches transactions
// from the given RPC endpoint starting from the specified block.
func NewReplayMempool(log log.Logger, rpcURL string, startBlock uint64, chainID *big.Int) (*ReplayMempool, error) {
client, err := ethclient.Dial(rpcURL)
if err != nil {
return nil, err
}

return &ReplayMempool{
log: log,
client: client,
startBlock: startBlock,
currentBlock: startBlock,
chainID: chainID,
addressNonce: make(map[common.Address]uint64),
}, nil
}

// AddTransactions is a no-op for ReplayMempool since transactions come from the source node.
func (m *ReplayMempool) AddTransactions(_ []*types.Transaction) {
// No-op: transactions are fetched from the source node, not added manually
}

// NextBlock fetches the next block from the source node and returns its transactions.
// Returns (mempoolTxs, sequencerTxs) where:
// - mempoolTxs: regular transactions to be sent via eth_sendRawTransaction
// - sequencerTxs: deposit transactions to be included in payload attributes
func (m *ReplayMempool) NextBlock() ([][]byte, [][]byte) {
m.lock.Lock()
defer m.lock.Unlock()

ctx := context.Background()

block, err := m.client.BlockByNumber(ctx, big.NewInt(int64(m.currentBlock)))
if err != nil {
m.log.Warn("Failed to fetch block", "block", m.currentBlock, "error", err)
return nil, nil
}

m.log.Info("Fetched block for replay",
"block", m.currentBlock,
"txs", len(block.Transactions()),
"gas_used", block.GasUsed(),
)

m.currentBlock++

mempoolTxs := make([][]byte, 0)
sequencerTxs := make([][]byte, 0)

for _, tx := range block.Transactions() {
// Track nonces for GetTransactionCount
from, err := types.Sender(types.NewIsthmusSigner(m.chainID), tx)
if err != nil {
// Try with London signer for older transactions
from, err = types.Sender(types.NewLondonSigner(m.chainID), tx)
if err != nil {
m.log.Warn("Failed to get sender", "tx", tx.Hash(), "error", err)
continue
}
}
m.addressNonce[from] = tx.Nonce()

txBytes, err := tx.MarshalBinary()
if err != nil {
m.log.Warn("Failed to marshal transaction", "tx", tx.Hash(), "error", err)
continue
}

// Deposit transactions go to sequencer, others go to mempool
if tx.Type() == types.DepositTxType {
sequencerTxs = append(sequencerTxs, txBytes)
} else {
mempoolTxs = append(mempoolTxs, txBytes)
}
}

return mempoolTxs, sequencerTxs
}

// GetTransactionCount returns the latest nonce for an address.
func (m *ReplayMempool) GetTransactionCount(address common.Address) uint64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.addressNonce[address]
}

// CurrentBlock returns the current block number being replayed.
func (m *ReplayMempool) CurrentBlock() uint64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.currentBlock
}

// Close closes the underlying RPC client connection.
func (m *ReplayMempool) Close() {
m.client.Close()
}

var _ FakeMempool = &ReplayMempool{}

32 changes: 27 additions & 5 deletions runner/network/network_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ type NetworkBenchmark struct {

transactionPayload payload.Definition
ports portmanager.PortManager
replayConfig *benchmark.ReplayConfig
}

// NewNetworkBenchmark creates a new network benchmark and initializes the payload worker and consensus client
func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequencerOptions *config.InternalClientOptions, validatorOptions *config.InternalClientOptions, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, ports portmanager.PortManager) (*NetworkBenchmark, error) {
func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequencerOptions *config.InternalClientOptions, validatorOptions *config.InternalClientOptions, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, ports portmanager.PortManager, replayConfig *benchmark.ReplayConfig) (*NetworkBenchmark, error) {
return &NetworkBenchmark{
log: log,
sequencerOptions: sequencerOptions,
Expand All @@ -57,6 +58,7 @@ func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequence
proofConfig: proofConfig,
transactionPayload: transactionPayload,
ports: ports,
replayConfig: replayConfig,
}, nil
}

Expand Down Expand Up @@ -107,15 +109,35 @@ func (nb *NetworkBenchmark) benchmarkSequencer(ctx context.Context, l1Chain *l1C
}
}()

benchmark := newSequencerBenchmark(nb.log, *nb.testConfig, sequencerClient, l1Chain, nb.transactionPayload)
executionData, lastBlock, err := benchmark.Run(ctx, metricsCollector)
var executionData []engine.ExecutableData
var lastBlock uint64

// Use replay benchmark if replay config is provided
if nb.replayConfig != nil {
nb.log.Info("Using replay sequencer benchmark",
"source_rpc", nb.replayConfig.SourceRPCURL,
"start_block", nb.replayConfig.StartBlock,
)
replayBenchmark := NewReplaySequencerBenchmark(
nb.log,
*nb.testConfig,
sequencerClient,
l1Chain,
nb.replayConfig.SourceRPCURL,
nb.replayConfig.StartBlock,
)
executionData, lastBlock, err = replayBenchmark.Run(ctx, metricsCollector)
} else {
benchmark := newSequencerBenchmark(nb.log, *nb.testConfig, sequencerClient, l1Chain, nb.transactionPayload)
executionData, lastBlock, err = benchmark.Run(ctx, metricsCollector)
}

if err != nil {
sequencerClient.Stop()
return nil, 0, nil, fmt.Errorf("failed to run sequencer benchmark: %w", err)
return nil, 0, nil, err
}

return executionData, lastBlock, sequencerClient, nil
return executionData, lastBlock, sequencerClient, err
}

func (nb *NetworkBenchmark) benchmarkValidator(ctx context.Context, payloads []engine.ExecutableData, lastSetupBlock uint64, l1Chain *l1Chain, sequencerClient types.ExecutionClient) error {
Expand Down
Loading
Loading