diff --git a/crates/client/flashblocks/src/block_assembler.rs b/crates/client/flashblocks/src/block_assembler.rs new file mode 100644 index 00000000..ce443c90 --- /dev/null +++ b/crates/client/flashblocks/src/block_assembler.rs @@ -0,0 +1,205 @@ +//! Block assembly from flashblocks. +//! +//! This module provides the [`BlockAssembler`] which reconstructs blocks from flashblocks. + +use alloy_consensus::{Header, Sealed}; +use alloy_primitives::{B256, Bytes}; +use alloy_rpc_types::Withdrawal; +use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; +use base_flashtypes::{ExecutionPayloadBaseV1, Flashblock}; +use reth_evm::op_revm::L1BlockInfo; +use reth_optimism_primitives::OpBlock; + +use crate::{ExecutionError, ProtocolError, Result}; + +/// Result of assembling a block from flashblocks. +#[derive(Debug, Clone)] +pub struct AssembledBlock { + /// The reconstructed OP block. + pub block: OpBlock, + /// The base payload data from the first flashblock. + pub base: ExecutionPayloadBaseV1, + /// The flashblocks used to assemble this block. + pub flashblocks: Vec, + /// The sealed header for this block. + pub header: Sealed
, +} + +impl AssembledBlock { + /// Extracts L1 block info from the assembled block's body. + /// + /// This extracts the L1 attributes deposited transaction data from the + /// block body, which contains information about the L1 origin. + pub fn l1_block_info(&self) -> Result { + reth_optimism_evm::extract_l1_info(&self.block.body) + .map_err(|e| ExecutionError::L1BlockInfo(e.to_string()).into()) + } +} + +/// Assembles blocks from flashblocks. +/// +/// This component handles the reconstruction of complete blocks from +/// a sequence of flashblocks, extracting transactions, withdrawals, +/// and building the execution payload. +#[derive(Debug, Default)] +pub struct BlockAssembler; + +impl BlockAssembler { + /// Creates a new block assembler. + pub const fn new() -> Self { + Self + } + + /// Assembles a complete block from a slice of flashblocks. + /// + /// # Arguments + /// * `flashblocks` - A slice of flashblocks for a single block number. + /// + /// # Returns + /// An [`AssembledBlock`] containing the reconstructed block and metadata. + /// + /// # Errors + /// Returns an error if: + /// - The flashblocks slice is empty + /// - The first flashblock is missing its base payload + /// - Block conversion fails + pub fn assemble(flashblocks: &[Flashblock]) -> Result { + let first = flashblocks.first().ok_or(ProtocolError::EmptyFlashblocks)?; + let base = first.base.clone().ok_or(ProtocolError::MissingBase)?; + let latest_flashblock = flashblocks.last().ok_or(ProtocolError::EmptyFlashblocks)?; + + let transactions: Vec = flashblocks + .iter() + .flat_map(|flashblock| flashblock.diff.transactions.clone()) + .collect(); + + let withdrawals: Vec = + flashblocks.iter().flat_map(|flashblock| flashblock.diff.withdrawals.clone()).collect(); + + let execution_payload = ExecutionPayloadV3 { + blob_gas_used: 0, + excess_blob_gas: 0, + payload_inner: ExecutionPayloadV2 { + withdrawals, + payload_inner: ExecutionPayloadV1 { + parent_hash: base.parent_hash, + fee_recipient: base.fee_recipient, + state_root: latest_flashblock.diff.state_root, + receipts_root: latest_flashblock.diff.receipts_root, + logs_bloom: latest_flashblock.diff.logs_bloom, + prev_randao: base.prev_randao, + block_number: base.block_number, + gas_limit: base.gas_limit, + gas_used: latest_flashblock.diff.gas_used, + timestamp: base.timestamp, + extra_data: base.extra_data.clone(), + base_fee_per_gas: base.base_fee_per_gas, + block_hash: latest_flashblock.diff.block_hash, + transactions, + }, + }, + }; + + let block: OpBlock = execution_payload + .try_into_block() + .map_err(|e| ExecutionError::BlockConversion(e.to_string()))?; + + let block_header = block.header.clone(); + // Zero block hash for flashblocks since the final hash isn't known yet + let sealed_header = block_header.seal(B256::ZERO); + + Ok(AssembledBlock { block, base, flashblocks: flashblocks.to_vec(), header: sealed_header }) + } +} + +#[cfg(test)] +mod tests { + use alloy_primitives::{Address, Bloom, U256}; + use alloy_rpc_types_engine::PayloadId; + use base_flashtypes::{ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Metadata}; + + use super::*; + use crate::ProtocolError; + + fn create_test_flashblock(index: u64, with_base: bool) -> Flashblock { + Flashblock { + payload_id: PayloadId::default(), + index, + base: if with_base { + Some(ExecutionPayloadBaseV1 { + parent_beacon_block_root: B256::ZERO, + parent_hash: B256::ZERO, + fee_recipient: Address::ZERO, + prev_randao: B256::ZERO, + block_number: 100, + gas_limit: 30_000_000, + timestamp: 1700000000, + extra_data: Bytes::default(), + base_fee_per_gas: U256::from(1000000000u64), + }) + } else { + None + }, + diff: ExecutionPayloadFlashblockDeltaV1 { + state_root: B256::ZERO, + receipts_root: B256::ZERO, + logs_bloom: Bloom::default(), + gas_used: 21000, + block_hash: B256::ZERO, + transactions: vec![], + withdrawals: vec![], + withdrawals_root: B256::ZERO, + blob_gas_used: None, + }, + metadata: Metadata { block_number: 100 }, + } + } + + #[test] + fn test_assemble_single_flashblock() { + let flashblocks = vec![create_test_flashblock(0, true)]; + + let result = BlockAssembler::assemble(&flashblocks); + assert!(result.is_ok()); + + let assembled = result.unwrap(); + assert_eq!(assembled.base.block_number, 100); + assert_eq!(assembled.flashblocks.len(), 1); + } + + #[test] + fn test_assemble_multiple_flashblocks() { + let flashblocks = vec![ + create_test_flashblock(0, true), + create_test_flashblock(1, false), + create_test_flashblock(2, false), + ]; + + let result = BlockAssembler::assemble(&flashblocks); + assert!(result.is_ok()); + + let assembled = result.unwrap(); + assert_eq!(assembled.flashblocks.len(), 3); + } + + #[test] + fn test_assemble_empty_flashblocks_fails() { + let flashblocks: Vec = vec![]; + let result = BlockAssembler::assemble(&flashblocks); + assert!(matches!( + result, + Err(crate::StateProcessorError::Protocol(ProtocolError::EmptyFlashblocks)) + )); + } + + #[test] + fn test_assemble_missing_base_fails() { + let flashblocks = vec![create_test_flashblock(0, false)]; // No base + + let result = BlockAssembler::assemble(&flashblocks); + assert!(matches!( + result, + Err(crate::StateProcessorError::Protocol(ProtocolError::MissingBase)) + )); + } +} diff --git a/crates/client/flashblocks/src/lib.rs b/crates/client/flashblocks/src/lib.rs index abeb2b81..d720f337 100644 --- a/crates/client/flashblocks/src/lib.rs +++ b/crates/client/flashblocks/src/lib.rs @@ -6,6 +6,9 @@ #[macro_use] extern crate tracing; +mod block_assembler; +pub use block_assembler::{AssembledBlock, BlockAssembler}; + mod error; pub use error::{ BuildError, ExecutionError, ProtocolError, ProviderError, Result, StateProcessorError, diff --git a/crates/client/flashblocks/src/processor.rs b/crates/client/flashblocks/src/processor.rs index 3310c9a3..1f8f6eb2 100644 --- a/crates/client/flashblocks/src/processor.rs +++ b/crates/client/flashblocks/src/processor.rs @@ -7,9 +7,7 @@ use alloy_consensus::{ transaction::{Recovered, SignerRecoverable}, }; use alloy_eips::BlockNumberOrTag; -use alloy_primitives::{Address, B256, BlockNumber, Bytes}; -use alloy_rpc_types::Withdrawal; -use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; +use alloy_primitives::{Address, BlockNumber}; use alloy_rpc_types_eth::state::StateOverride; use arc_swap::ArcSwapOption; use base_flashtypes::Flashblock; @@ -28,8 +26,8 @@ use revm_database::states::bundle_state::BundleRetention; use tokio::sync::{Mutex, broadcast::Sender, mpsc::UnboundedReceiver}; use crate::{ - ExecutionError, Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder, - ProtocolError, ProviderError, Result, + BlockAssembler, ExecutionError, Metrics, PendingBlocks, PendingBlocksBuilder, + PendingStateBuilder, ProviderError, Result, validation::{ CanonicalBlockReconciler, FlashblockSequenceValidator, ReconciliationStrategy, ReorgDetector, SequenceValidationResult, @@ -218,7 +216,7 @@ where Some(pb) => pb, None => { if flashblock.index == 0 { - return self.build_pending_state(None, &vec![flashblock]); + return self.build_pending_state(None, &[flashblock]); } else { info!(message = "waiting for first Flashblock"); return Ok(None); @@ -278,15 +276,15 @@ where fn build_pending_state( &self, prev_pending_blocks: Option>, - flashblocks: &Vec, + flashblocks: &[Flashblock], ) -> Result>> { // BTreeMap guarantees ascending order of keys while iterating - let mut flashblocks_per_block = BTreeMap::>::new(); + let mut flashblocks_per_block = BTreeMap::>::new(); for flashblock in flashblocks { flashblocks_per_block .entry(flashblock.metadata.block_number) .or_default() - .push(flashblock); + .push(flashblock.clone()); } let earliest_block_number = flashblocks_per_block.keys().min().unwrap(); @@ -322,70 +320,22 @@ where }); for (_block_number, flashblocks) in flashblocks_per_block { - let base = flashblocks - .first() - .ok_or(ProtocolError::EmptyFlashblocks)? - .base - .clone() - .ok_or(ProtocolError::MissingBase)?; - - let latest_flashblock = - flashblocks.last().cloned().ok_or(ProtocolError::EmptyFlashblocks)?; - - let transactions: Vec = flashblocks - .iter() - .flat_map(|flashblock| flashblock.diff.transactions.clone()) - .collect(); - - let withdrawals: Vec = flashblocks - .iter() - .flat_map(|flashblock| flashblock.diff.withdrawals.clone()) - .collect(); - - pending_blocks_builder.with_flashblocks( - flashblocks.iter().map(|&x| x.clone()).collect::>(), - ); + // Use BlockAssembler to reconstruct the block from flashblocks + let assembled = BlockAssembler::assemble(&flashblocks)?; - let execution_payload: ExecutionPayloadV3 = ExecutionPayloadV3 { - blob_gas_used: 0, - excess_blob_gas: 0, - payload_inner: ExecutionPayloadV2 { - withdrawals, - payload_inner: ExecutionPayloadV1 { - parent_hash: base.parent_hash, - fee_recipient: base.fee_recipient, - state_root: latest_flashblock.diff.state_root, - receipts_root: latest_flashblock.diff.receipts_root, - logs_bloom: latest_flashblock.diff.logs_bloom, - prev_randao: base.prev_randao, - block_number: base.block_number, - gas_limit: base.gas_limit, - gas_used: latest_flashblock.diff.gas_used, - timestamp: base.timestamp, - extra_data: base.extra_data.clone(), - base_fee_per_gas: base.base_fee_per_gas, - block_hash: latest_flashblock.diff.block_hash, - transactions, - }, - }, - }; + pending_blocks_builder.with_flashblocks(assembled.flashblocks.clone()); + pending_blocks_builder.with_header(assembled.header.clone()); - let block: OpBlock = execution_payload - .try_into_block() - .map_err(|e| ExecutionError::BlockConversion(e.to_string()))?; - let l1_block_info = reth_optimism_evm::extract_l1_info(&block.body) - .map_err(|e| ExecutionError::L1BlockInfo(e.to_string()))?; - let block_header = block.header.clone(); // prevents us from needing to clone the entire block - let sealed_header = block_header.clone().seal(B256::ZERO); // zero block hash for flashblocks - pending_blocks_builder.with_header(sealed_header); + // Extract L1 block info using the AssembledBlock method + let l1_block_info = assembled.l1_block_info()?; let block_env_attributes = OpNextBlockEnvAttributes { - timestamp: base.timestamp, - suggested_fee_recipient: base.fee_recipient, - prev_randao: base.prev_randao, - gas_limit: base.gas_limit, - parent_beacon_block_root: Some(base.parent_beacon_block_root), - extra_data: base.extra_data.clone(), + timestamp: assembled.base.timestamp, + suggested_fee_recipient: assembled.base.fee_recipient, + prev_randao: assembled.base.prev_randao, + gas_limit: assembled.base.gas_limit, + parent_beacon_block_root: Some(assembled.base.parent_beacon_block_root), + extra_data: assembled.base.extra_data.clone(), }; let evm_env = evm_config @@ -395,7 +345,8 @@ where // Parallel sender recovery - batch all ECDSA operations upfront let recovery_start = Instant::now(); - let txs_with_senders: Vec<(OpTxEnvelope, Address)> = block + let txs_with_senders: Vec<(OpTxEnvelope, Address)> = assembled + .block .body .transactions .par_iter() @@ -414,10 +365,13 @@ where .collect::>()?; self.metrics.sender_recovery_duration.record(recovery_start.elapsed()); + // Clone header before moving block to avoid cloning the entire block + let block_header = assembled.block.header.clone(); + let mut pending_state_builder = PendingStateBuilder::new( self.client.chain_spec(), evm, - block, + assembled.block, prev_pending_blocks.clone(), l1_block_info, state_overrides,