Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions crates/client/flashblocks/src/block_assembler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
//! Block assembly from flashblocks.
//!
//! This module provides the [`BlockAssembler`] which reconstructs blocks from flashblocks.

use alloy_consensus::{Header, Sealed};
use alloy_primitives::{B256, Bytes};
use alloy_rpc_types::Withdrawal;
use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
use base_flashtypes::{ExecutionPayloadBaseV1, Flashblock};
use reth_evm::op_revm::L1BlockInfo;
use reth_optimism_primitives::OpBlock;

use crate::{ExecutionError, ProtocolError, Result};

/// Result of assembling a block from flashblocks.
#[derive(Debug, Clone)]
pub struct AssembledBlock {
/// The reconstructed OP block.
pub block: OpBlock,
/// The base payload data from the first flashblock.
pub base: ExecutionPayloadBaseV1,
/// The flashblocks used to assemble this block.
pub flashblocks: Vec<Flashblock>,
/// The sealed header for this block.
pub header: Sealed<Header>,
}

impl AssembledBlock {
/// Extracts L1 block info from the assembled block's body.
///
/// This extracts the L1 attributes deposited transaction data from the
/// block body, which contains information about the L1 origin.
pub fn l1_block_info(&self) -> Result<L1BlockInfo> {
reth_optimism_evm::extract_l1_info(&self.block.body)
.map_err(|e| ExecutionError::L1BlockInfo(e.to_string()).into())
}
}

/// Assembles blocks from flashblocks.
///
/// This component handles the reconstruction of complete blocks from
/// a sequence of flashblocks, extracting transactions, withdrawals,
/// and building the execution payload.
#[derive(Debug, Default)]
pub struct BlockAssembler;

impl BlockAssembler {
/// Creates a new block assembler.
pub const fn new() -> Self {
Self
}

/// Assembles a complete block from a slice of flashblocks.
///
/// # Arguments
/// * `flashblocks` - A slice of flashblocks for a single block number.
///
/// # Returns
/// An [`AssembledBlock`] containing the reconstructed block and metadata.
///
/// # Errors
/// Returns an error if:
/// - The flashblocks slice is empty
/// - The first flashblock is missing its base payload
/// - Block conversion fails
pub fn assemble(flashblocks: &[Flashblock]) -> Result<AssembledBlock> {
let first = flashblocks.first().ok_or(ProtocolError::EmptyFlashblocks)?;
let base = first.base.clone().ok_or(ProtocolError::MissingBase)?;
let latest_flashblock = flashblocks.last().ok_or(ProtocolError::EmptyFlashblocks)?;

let transactions: Vec<Bytes> = flashblocks
.iter()
.flat_map(|flashblock| flashblock.diff.transactions.clone())
.collect();

let withdrawals: Vec<Withdrawal> =
flashblocks.iter().flat_map(|flashblock| flashblock.diff.withdrawals.clone()).collect();

let execution_payload = ExecutionPayloadV3 {
blob_gas_used: 0,
excess_blob_gas: 0,
payload_inner: ExecutionPayloadV2 {
withdrawals,
payload_inner: ExecutionPayloadV1 {
parent_hash: base.parent_hash,
fee_recipient: base.fee_recipient,
state_root: latest_flashblock.diff.state_root,
receipts_root: latest_flashblock.diff.receipts_root,
logs_bloom: latest_flashblock.diff.logs_bloom,
prev_randao: base.prev_randao,
block_number: base.block_number,
gas_limit: base.gas_limit,
gas_used: latest_flashblock.diff.gas_used,
timestamp: base.timestamp,
extra_data: base.extra_data.clone(),
base_fee_per_gas: base.base_fee_per_gas,
block_hash: latest_flashblock.diff.block_hash,
transactions,
},
},
};

let block: OpBlock = execution_payload
.try_into_block()
.map_err(|e| ExecutionError::BlockConversion(e.to_string()))?;

let block_header = block.header.clone();
// Zero block hash for flashblocks since the final hash isn't known yet
let sealed_header = block_header.seal(B256::ZERO);

Ok(AssembledBlock { block, base, flashblocks: flashblocks.to_vec(), header: sealed_header })
}
}

#[cfg(test)]
mod tests {
use alloy_primitives::{Address, Bloom, U256};
use alloy_rpc_types_engine::PayloadId;
use base_flashtypes::{ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Metadata};

use super::*;
use crate::ProtocolError;

fn create_test_flashblock(index: u64, with_base: bool) -> Flashblock {
Flashblock {
payload_id: PayloadId::default(),
index,
base: if with_base {
Some(ExecutionPayloadBaseV1 {
parent_beacon_block_root: B256::ZERO,
parent_hash: B256::ZERO,
fee_recipient: Address::ZERO,
prev_randao: B256::ZERO,
block_number: 100,
gas_limit: 30_000_000,
timestamp: 1700000000,
extra_data: Bytes::default(),
base_fee_per_gas: U256::from(1000000000u64),
})
} else {
None
},
diff: ExecutionPayloadFlashblockDeltaV1 {
state_root: B256::ZERO,
receipts_root: B256::ZERO,
logs_bloom: Bloom::default(),
gas_used: 21000,
block_hash: B256::ZERO,
transactions: vec![],
withdrawals: vec![],
withdrawals_root: B256::ZERO,
blob_gas_used: None,
},
metadata: Metadata { block_number: 100 },
}
}

#[test]
fn test_assemble_single_flashblock() {
let flashblocks = vec![create_test_flashblock(0, true)];

let result = BlockAssembler::assemble(&flashblocks);
assert!(result.is_ok());

let assembled = result.unwrap();
assert_eq!(assembled.base.block_number, 100);
assert_eq!(assembled.flashblocks.len(), 1);
}

#[test]
fn test_assemble_multiple_flashblocks() {
let flashblocks = vec![
create_test_flashblock(0, true),
create_test_flashblock(1, false),
create_test_flashblock(2, false),
];

let result = BlockAssembler::assemble(&flashblocks);
assert!(result.is_ok());

let assembled = result.unwrap();
assert_eq!(assembled.flashblocks.len(), 3);
}

#[test]
fn test_assemble_empty_flashblocks_fails() {
let flashblocks: Vec<Flashblock> = vec![];
let result = BlockAssembler::assemble(&flashblocks);
assert!(matches!(
result,
Err(crate::StateProcessorError::Protocol(ProtocolError::EmptyFlashblocks))
));
}

#[test]
fn test_assemble_missing_base_fails() {
let flashblocks = vec![create_test_flashblock(0, false)]; // No base

let result = BlockAssembler::assemble(&flashblocks);
assert!(matches!(
result,
Err(crate::StateProcessorError::Protocol(ProtocolError::MissingBase))
));
}
}
3 changes: 3 additions & 0 deletions crates/client/flashblocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#[macro_use]
extern crate tracing;

mod block_assembler;
pub use block_assembler::{AssembledBlock, BlockAssembler};

mod error;
pub use error::{
BuildError, ExecutionError, ProtocolError, ProviderError, Result, StateProcessorError,
Expand Down
96 changes: 25 additions & 71 deletions crates/client/flashblocks/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use alloy_consensus::{
transaction::{Recovered, SignerRecoverable},
};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::{Address, B256, BlockNumber, Bytes};
use alloy_rpc_types::Withdrawal;
use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
use alloy_primitives::{Address, BlockNumber};
use alloy_rpc_types_eth::state::StateOverride;
use arc_swap::ArcSwapOption;
use base_flashtypes::Flashblock;
Expand All @@ -28,8 +26,8 @@ use revm_database::states::bundle_state::BundleRetention;
use tokio::sync::{Mutex, broadcast::Sender, mpsc::UnboundedReceiver};

use crate::{
ExecutionError, Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder,
ProtocolError, ProviderError, Result,
BlockAssembler, ExecutionError, Metrics, PendingBlocks, PendingBlocksBuilder,
PendingStateBuilder, ProviderError, Result,
validation::{
CanonicalBlockReconciler, FlashblockSequenceValidator, ReconciliationStrategy,
ReorgDetector, SequenceValidationResult,
Expand Down Expand Up @@ -218,7 +216,7 @@ where
Some(pb) => pb,
None => {
if flashblock.index == 0 {
return self.build_pending_state(None, &vec![flashblock]);
return self.build_pending_state(None, &[flashblock]);
} else {
info!(message = "waiting for first Flashblock");
return Ok(None);
Expand Down Expand Up @@ -278,15 +276,15 @@ where
fn build_pending_state(
&self,
prev_pending_blocks: Option<Arc<PendingBlocks>>,
flashblocks: &Vec<Flashblock>,
flashblocks: &[Flashblock],
) -> Result<Option<Arc<PendingBlocks>>> {
// BTreeMap guarantees ascending order of keys while iterating
let mut flashblocks_per_block = BTreeMap::<BlockNumber, Vec<&Flashblock>>::new();
let mut flashblocks_per_block = BTreeMap::<BlockNumber, Vec<Flashblock>>::new();
for flashblock in flashblocks {
flashblocks_per_block
.entry(flashblock.metadata.block_number)
.or_default()
.push(flashblock);
.push(flashblock.clone());
}

let earliest_block_number = flashblocks_per_block.keys().min().unwrap();
Expand Down Expand Up @@ -322,70 +320,22 @@ where
});

for (_block_number, flashblocks) in flashblocks_per_block {
let base = flashblocks
.first()
.ok_or(ProtocolError::EmptyFlashblocks)?
.base
.clone()
.ok_or(ProtocolError::MissingBase)?;

let latest_flashblock =
flashblocks.last().cloned().ok_or(ProtocolError::EmptyFlashblocks)?;

let transactions: Vec<Bytes> = flashblocks
.iter()
.flat_map(|flashblock| flashblock.diff.transactions.clone())
.collect();

let withdrawals: Vec<Withdrawal> = flashblocks
.iter()
.flat_map(|flashblock| flashblock.diff.withdrawals.clone())
.collect();

pending_blocks_builder.with_flashblocks(
flashblocks.iter().map(|&x| x.clone()).collect::<Vec<Flashblock>>(),
);
// Use BlockAssembler to reconstruct the block from flashblocks
let assembled = BlockAssembler::assemble(&flashblocks)?;

let execution_payload: ExecutionPayloadV3 = ExecutionPayloadV3 {
blob_gas_used: 0,
excess_blob_gas: 0,
payload_inner: ExecutionPayloadV2 {
withdrawals,
payload_inner: ExecutionPayloadV1 {
parent_hash: base.parent_hash,
fee_recipient: base.fee_recipient,
state_root: latest_flashblock.diff.state_root,
receipts_root: latest_flashblock.diff.receipts_root,
logs_bloom: latest_flashblock.diff.logs_bloom,
prev_randao: base.prev_randao,
block_number: base.block_number,
gas_limit: base.gas_limit,
gas_used: latest_flashblock.diff.gas_used,
timestamp: base.timestamp,
extra_data: base.extra_data.clone(),
base_fee_per_gas: base.base_fee_per_gas,
block_hash: latest_flashblock.diff.block_hash,
transactions,
},
},
};
pending_blocks_builder.with_flashblocks(assembled.flashblocks.clone());
pending_blocks_builder.with_header(assembled.header.clone());

let block: OpBlock = execution_payload
.try_into_block()
.map_err(|e| ExecutionError::BlockConversion(e.to_string()))?;
let l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)
.map_err(|e| ExecutionError::L1BlockInfo(e.to_string()))?;
let block_header = block.header.clone(); // prevents us from needing to clone the entire block
let sealed_header = block_header.clone().seal(B256::ZERO); // zero block hash for flashblocks
pending_blocks_builder.with_header(sealed_header);
// Extract L1 block info using the AssembledBlock method
let l1_block_info = assembled.l1_block_info()?;

let block_env_attributes = OpNextBlockEnvAttributes {
timestamp: base.timestamp,
suggested_fee_recipient: base.fee_recipient,
prev_randao: base.prev_randao,
gas_limit: base.gas_limit,
parent_beacon_block_root: Some(base.parent_beacon_block_root),
extra_data: base.extra_data.clone(),
timestamp: assembled.base.timestamp,
suggested_fee_recipient: assembled.base.fee_recipient,
prev_randao: assembled.base.prev_randao,
gas_limit: assembled.base.gas_limit,
parent_beacon_block_root: Some(assembled.base.parent_beacon_block_root),
extra_data: assembled.base.extra_data.clone(),
};

let evm_env = evm_config
Expand All @@ -395,7 +345,8 @@ where

// Parallel sender recovery - batch all ECDSA operations upfront
let recovery_start = Instant::now();
let txs_with_senders: Vec<(OpTxEnvelope, Address)> = block
let txs_with_senders: Vec<(OpTxEnvelope, Address)> = assembled
.block
.body
.transactions
.par_iter()
Expand All @@ -414,10 +365,13 @@ where
.collect::<Result<_>>()?;
self.metrics.sender_recovery_duration.record(recovery_start.elapsed());

// Clone header before moving block to avoid cloning the entire block
let block_header = assembled.block.header.clone();

let mut pending_state_builder = PendingStateBuilder::new(
self.client.chain_spec(),
evm,
block,
assembled.block,
prev_pending_blocks.clone(),
l1_block_info,
state_overrides,
Expand Down
Loading