diff --git a/node/src/components/block_synchronizer/block_acquisition.rs b/node/src/components/block_synchronizer/block_acquisition.rs index 2b0ead6f94..695c4c4fc2 100644 --- a/node/src/components/block_synchronizer/block_acquisition.rs +++ b/node/src/components/block_synchronizer/block_acquisition.rs @@ -383,7 +383,9 @@ impl BlockAcquisitionState { ) => { if false == is_historical { Err(BlockAcquisitionError::InvalidStateTransition) - } else if transaction_state.needs_transaction() { + } else if block.transaction_count() == 0 || transaction_state.needs_transaction() { + // There is an execution result checksum and there is a derived utilization + // score that is meaningfull even when there are no transactions. BlockAcquisitionAction::maybe_execution_results( block, peer_list, @@ -506,7 +508,8 @@ impl BlockAcquisitionState { } BlockAcquisitionState::HaveStrictFinalitySignatures(block, ..) => { if is_historical { - // we have enough signatures; need to make sure we've stored the necessary bits + // we have enough signatures; need to make sure we've + // stored the necessary bits Ok(BlockAcquisitionAction::block_marked_complete( *block.hash(), block.height(), @@ -1199,7 +1202,7 @@ impl BlockAcquisitionState { Ok(()) } - /// Register a transactions for this block. + /// Register a transaction for this block. pub(super) fn register_transaction( &mut self, txn_id: TransactionId, diff --git a/node/src/components/block_synchronizer/tests.rs b/node/src/components/block_synchronizer/tests.rs index 75d869637a..71a305eb8b 100644 --- a/node/src/components/block_synchronizer/tests.rs +++ b/node/src/components/block_synchronizer/tests.rs @@ -2360,13 +2360,15 @@ fn historical_state(block_synchronizer: &BlockSynchronizer) -> &BlockAcquisition .block_acquisition_state() } -/// When there is no deploy, the state goes from `HaveGlobalState` to `HaveStrictFinalitySignature` -/// directly, skipping `HaveAllExecutionResults`, `HaveApprovalsHashes` and `HaveAllTransactions`. +/// Even if a block has no transaction it needs to go through "the regular" states becasue in those +/// states we calculate utilization tracking for the block (even if it's empty we still need to +/// calculate it's utilization). #[tokio::test] -async fn historical_sync_skips_exec_results_and_deploys_if_block_empty() { +async fn historical_sync_does_not_skip_exec_results_if_block_empty() { let rng = &mut TestRng::new(); let mock_reactor = MockReactor::new(); - let test_env = TestEnv::random(rng); + let test_env = + TestEnv::random(rng).with_block(TestBlockBuilder::new().era(1).build(rng).into()); let peers = test_env.peers(); let block = test_env.block(); let validator_matrix = test_env.gen_validator_matrix(); @@ -2381,8 +2383,6 @@ async fn historical_sync_skips_exec_results_and_deploys_if_block_empty() { assert!(block_synchronizer.forward.is_none()); block_synchronizer.register_peers(*block.hash(), peers.clone()); - // Skip steps HaveBlockHeader, HaveWeakFinalitySignature, HaveBlock - let historical_builder = block_synchronizer .historical .as_mut() @@ -2422,17 +2422,8 @@ async fn historical_sync_skips_exec_results_and_deploys_if_block_empty() { Event::GlobalStateSynchronizer(global_state_synchronizer::Event::Request(request)), ); - // ----- HaveBlock ----- - assert_matches!( - historical_state(&block_synchronizer), - BlockAcquisitionState::HaveBlock { .. } - ); - // Those effects are handled directly and not through the reactor: - let events = effects - .try_one() - .expect("there should be only one effect") - .await; + let events = effects.one().await; assert_matches!( events.try_one(), Some(Event::GlobalStateSynchronizer( @@ -2440,6 +2431,15 @@ async fn historical_sync_skips_exec_results_and_deploys_if_block_empty() { )) ); + // ----- HaveBlock ----- + assert_matches!( + historical_state(&block_synchronizer), + BlockAcquisitionState::HaveBlock { .. } + ); + + // Let's not test the detail of the global synchronization event, + // since it is already tested in its unit tests. + let effects = block_synchronizer.handle_event( mock_reactor.effect_builder(), rng, @@ -2457,11 +2457,115 @@ async fn historical_sync_skips_exec_results_and_deploys_if_block_empty() { historical_state(&block_synchronizer), BlockAcquisitionState::HaveGlobalState { .. } ); + + let events = mock_reactor.process_effects(effects).await; + + match events.try_one() { + Some(MockReactorEvent::ContractRuntimeRequest( + ContractRuntimeRequest::GetExecutionResultsChecksum { + state_root_hash, + responder, + }, + )) => responder.respond(ExecutionResultsChecksumResult::Success { checksum: state_root_hash }).await, + other => panic!("Event should be of type `ContractRuntimeRequest(ContractRuntimeRequest::GetExecutionResultsChecksum) but it is {:?}", other), + } + + let effects = block_synchronizer.handle_event( + mock_reactor.effect_builder(), + rng, + Event::GotExecutionResultsChecksum { + block_hash: *block.hash(), + result: ExecutionResultsChecksumResult::Success { + checksum: Digest::SENTINEL_NONE, + }, + }, + ); + let events = mock_reactor.process_effects(effects).await; + + for event in events { + assert_matches!( + event, + MockReactorEvent::BlockExecutionResultsOrChunkFetcherRequest(FetcherRequest { .. }) + ); + } + + let execution_results = BlockExecutionResultsOrChunk::new_empty_value(*block.hash()); + let effects = block_synchronizer.handle_event( + mock_reactor.effect_builder(), + rng, + Event::ExecutionResultsFetched { + block_hash: *block.hash(), + result: Ok(FetchedData::from_storage(Box::new(execution_results))), + }, + ); + + let mut events = mock_reactor.process_effects(effects).await; + + assert_matches!( + historical_state(&block_synchronizer), + BlockAcquisitionState::HaveGlobalState { .. } + ); + + assert_matches!( + events.remove(0), + MockReactorEvent::StorageRequest(StorageRequest::PutExecutionResults { .. }) + ); + for event in events { + assert_matches!( + event, + MockReactorEvent::ApprovalsHashesFetcherRequest(FetcherRequest { .. }) + ); + } + + let effects = block_synchronizer.handle_event( + mock_reactor.effect_builder(), + rng, + Event::ExecutionResultsStored(*block.hash()), + ); + // ----- HaveAllExecutionResults ----- + assert_matches!( + historical_state(&block_synchronizer), + BlockAcquisitionState::HaveAllExecutionResults(_, _, _, checksum) if checksum.is_checkable() + ); + let events = mock_reactor.process_effects(effects).await; for event in events { - assert_matches!(event, MockReactorEvent::FinalitySignatureFetcherRequest(..)); + assert_matches!( + event, + MockReactorEvent::ApprovalsHashesFetcherRequest(FetcherRequest { .. }) + ); } + + let effects = block_synchronizer.handle_event( + mock_reactor.effect_builder(), + rng, + Event::ApprovalsHashesFetched(Ok(FetchedData::from_storage(Box::new( + ApprovalsHashes::new(*block.hash(), vec![], dummy_merkle_proof()), + )))), + ); + // ----- HaveApprovalsHashes ----- + assert_matches!( + historical_state(&block_synchronizer), + BlockAcquisitionState::HaveApprovalsHashes(_, _, _) + ); + + let events = mock_reactor.process_effects(effects).await; + assert!(!events.is_empty()); + // Since the block doesn't have any transactions, + // the next step should be to fetch the finality signatures for strict finality. + for event in events { + assert_matches!( + event, + MockReactorEvent::FinalitySignatureFetcherRequest(FetcherRequest { + id, + peer, + .. + }) if peers.contains(&peer) && id.block_hash() == block.hash() && id.era_id() == block.era_id() + ); + } + + // The rest would be fetching finality signatures which is covered by other tests } #[tokio::test] diff --git a/node/src/components/consensus/era_supervisor.rs b/node/src/components/consensus/era_supervisor.rs index 01ce54b164..f794ed6317 100644 --- a/node/src/components/consensus/era_supervisor.rs +++ b/node/src/components/consensus/era_supervisor.rs @@ -1331,6 +1331,11 @@ impl EraSupervisor { let initial_era_height = self.era(era_id).start_height; initial_era_height.saturating_add(block_context.ancestor_values().len() as u64) } + + // What is the block height of the next block we expect to execute? + pub(crate) fn next_executed_height(&self) -> u64 { + self.next_executed_height + } } /// A serialized consensus network message. diff --git a/node/src/components/contract_runtime.rs b/node/src/components/contract_runtime.rs index 4325fc1a27..3c0771780b 100644 --- a/node/src/components/contract_runtime.rs +++ b/node/src/components/contract_runtime.rs @@ -54,7 +54,7 @@ use crate::{ effect::{ announcements::{ ContractRuntimeAnnouncement, FatalAnnouncement, MetaBlockAnnouncement, - UnexecutedBlockAnnouncement, + NonExecutableBlockAnnouncement, UnexecutedBlockAnnouncement, }, incoming::{TrieDemand, TrieRequest as TrieRequestMessage, TrieRequestIncoming}, requests::{ContractRuntimeRequest, NetworkRequest, StorageRequest}, @@ -81,7 +81,7 @@ pub(crate) use types::{ BlockAndExecutionArtifacts, ExecutionArtifact, ExecutionPreState, SpeculativeExecutionResult, StepOutcome, }; -use utils::{exec_or_requeue, run_intensive_task}; +use utils::{exec_and_check_next, run_intensive_task}; const COMPONENT_NAME: &str = "contract_runtime"; @@ -196,7 +196,7 @@ impl ContractRuntime { }) } - pub(crate) fn set_initial_state(&mut self, sequential_block_state: ExecutionPreState) { + pub(crate) fn set_execution_pre_state(&mut self, sequential_block_state: ExecutionPreState) { let next_block_height = sequential_block_state.next_block_height(); let mut execution_pre_state = self.execution_pre_state.lock().unwrap(); *execution_pre_state = sequential_block_state; @@ -208,6 +208,12 @@ impl ContractRuntime { debug!(next_block_height, "ContractRuntime: set initial state"); } + /// Returns the current execution prestate. + pub(crate) fn execution_pre_state(&self) -> ExecutionPreState { + let execution_pre_state = self.execution_pre_state.lock().unwrap(); + execution_pre_state.clone() + } + fn new_data_access_layer( storage_dir: &Path, contract_runtime_config: &Config, @@ -314,6 +320,7 @@ impl ContractRuntime { + From + From + From + + From + Send, { match request { @@ -540,7 +547,7 @@ impl ContractRuntime { } ContractRuntimeRequest::UpdatePreState { new_pre_state } => { let next_block_height = new_pre_state.next_block_height(); - self.set_initial_state(new_pre_state); + self.set_execution_pre_state(new_pre_state); let current_price = self.current_gas_price.gas_price(); async move { let block_header = match effect_builder @@ -635,7 +642,7 @@ impl ContractRuntime { let current_pre_state = self.execution_pre_state.lock().unwrap(); let next_block_height = current_pre_state.next_block_height(); match finalized_block_height.cmp(&next_block_height) { - // An old block: it won't be executed: + // An old block: it won't be enqueued: Ordering::Less => { debug!( %era_id, @@ -645,7 +652,7 @@ impl ContractRuntime { ); effects.extend( effect_builder - .announce_unexecuted_block(finalized_block_height) + .announce_not_enqueuing_old_executable_block(finalized_block_height) .ignore(), ); } @@ -683,8 +690,13 @@ impl ContractRuntime { let chainspec = Arc::clone(&self.chainspec); let metrics = Arc::clone(&self.metrics); let shared_pre_state = Arc::clone(&self.execution_pre_state); + // the way this works is inobvious. if the current executable block + // executes and its child is enqueued the underlying logic will + // update the pre-state to refer to the child, pop the child from the queue, + // and send a new event of this kind with the child. it will then get into + // this match arm and get executed without being re-enqueued. effects.extend( - exec_or_requeue( + exec_and_check_next( data_access_layer, execution_engine_v1, execution_engine_v2, @@ -843,6 +855,7 @@ where + From + From + From + + From + Send, { type Event = Event; diff --git a/node/src/components/contract_runtime/tests.rs b/node/src/components/contract_runtime/tests.rs index 490f14ef56..c147f774a6 100644 --- a/node/src/components/contract_runtime/tests.rs +++ b/node/src/components/contract_runtime/tests.rs @@ -89,6 +89,8 @@ impl Unhandled for NetworkRequest {} impl Unhandled for UnexecutedBlockAnnouncement {} +impl Unhandled for NonExecutableBlockAnnouncement {} + struct TestConfig { config: Config, fixture_name: Option, @@ -261,7 +263,7 @@ async fn should_not_set_shared_pre_state_to_lower_block_height() { .reactor_mut() .inner_mut() .contract_runtime - .set_initial_state(initial_pre_state); + .set_execution_pre_state(initial_pre_state); // Create the genesis immediate switch block. let block_0 = ExecutableBlock::from_finalized_block_and_transactions( @@ -398,7 +400,7 @@ async fn should_not_set_shared_pre_state_to_lower_block_height() { .reactor_mut() .inner_mut() .contract_runtime - .set_initial_state(ExecutionPreState::new( + .set_execution_pre_state(ExecutionPreState::new( next_block_height, Digest::hash(rng.next_u64().to_le_bytes()), BlockHash::random(rng), @@ -535,7 +537,7 @@ async fn should_correctly_manage_entity_version_calls() { .reactor_mut() .inner_mut() .contract_runtime - .set_initial_state(initial_pre_state); + .set_execution_pre_state(initial_pre_state); // Create the genesis immediate switch block. let block_0 = ExecutableBlock::from_finalized_block_and_transactions( diff --git a/node/src/components/contract_runtime/utils.rs b/node/src/components/contract_runtime/utils.rs index 7e0c7b1509..b2324d8dfd 100644 --- a/node/src/components/contract_runtime/utils.rs +++ b/node/src/components/contract_runtime/utils.rs @@ -9,7 +9,7 @@ use std::{ sync::{Arc, Mutex}, time::Instant, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; use crate::{ contract_runtime::{ @@ -19,7 +19,10 @@ use crate::{ rewards, BlockAndExecutionArtifacts, BlockExecutionError, ExecutionPreState, StepOutcome, }, effect::{ - announcements::{ContractRuntimeAnnouncement, FatalAnnouncement, MetaBlockAnnouncement}, + announcements::{ + ContractRuntimeAnnouncement, FatalAnnouncement, MetaBlockAnnouncement, + NonExecutableBlockAnnouncement, + }, requests::{ContractRuntimeRequest, StorageRequest}, EffectBuilder, }, @@ -68,21 +71,30 @@ where } } -#[allow(clippy::too_many_arguments)] -pub(super) async fn exec_or_requeue( +// Maybe era end processing instructions. +#[derive(Debug)] +enum EraEndInstruction { + // Is not a switch block. + ExecNonSwitch, + // Is a switch block, and we can calc next era gas price, thus we can exec. + ExecSwitch { next_gas_price: u8 }, + // Is a switch block, but we cannot execute. + NoExec, + // Fatal with error string. + Fatal(String), +} + +/// This currently handles reward and dynamic gas price calculation. If in future +/// similar end of era determinations need to be made, they should potentially +/// be added here. +async fn handle_era_end( data_access_layer: Arc>, - execution_engine_v1: Arc, - execution_engine_v2: ExecutorV2, chainspec: Arc, metrics: Arc, - mut exec_queue: ExecQueue, - shared_pre_state: Arc>, - current_pre_state: ExecutionPreState, effect_builder: EffectBuilder, - mut executable_block: ExecutableBlock, - key_block_height_for_activation_point: u64, - mut meta_block_state: MetaBlockState, -) where + executable_block: &mut ExecutableBlock, +) -> EraEndInstruction +where REv: From + From + From @@ -90,14 +102,13 @@ pub(super) async fn exec_or_requeue( + From + Send, { - debug!("ContractRuntime: execute_finalized_block_or_requeue"); - let contract_runtime_metrics = metrics.clone(); - let is_era_end = executable_block.era_report.is_some(); - let current_gas_price = executable_block.current_gas_price; - let era_id = executable_block.era_id; - let block_height = executable_block.height; + if executable_block.era_report.is_none() { + return EraEndInstruction::ExecNonSwitch; + } + // this logic could be further broken down to each part if desired - if is_era_end && executable_block.rewards.is_none() { + // reward stuff + if executable_block.rewards.is_none() { executable_block.rewards = Some(if chainspec.core_config.compute_rewards { let rewards = match rewards::fetch_data_and_calculate_rewards_for_era( effect_builder, @@ -110,7 +121,9 @@ pub(super) async fn exec_or_requeue( { Ok(rewards) => rewards, Err(e) => { - return fatal!(effect_builder, "Failed to compute the rewards: {e:?}").await; + return EraEndInstruction::Fatal(format!( + "Failed to compute the rewards: {e:?}" + )); } }; @@ -122,161 +135,146 @@ pub(super) async fn exec_or_requeue( }); } - let maybe_next_era_gas_price = if is_era_end && executable_block.next_era_gas_price.is_none() { - let max_block_size = chainspec.transaction_config.max_block_size as u64; - let block_gas_limit = chainspec.transaction_config.block_gas_limit; - let go_up = chainspec.vacancy_config.upper_threshold; - let go_down = chainspec.vacancy_config.lower_threshold; - let max = chainspec.vacancy_config.max_gas_price; - let min = chainspec.vacancy_config.min_gas_price; - info!(%era_id, %block_height, "End of era calculating new gas price"); - let era_id = executable_block.era_id; - let block_height = executable_block.height; - - let per_block_capacity = chainspec - .transaction_config - .transaction_v1_config - .get_max_block_count(); - - let switch_block_utilization_score = { - let mut has_hit_slot_limt = false; - let mut transaction_hash_to_lane_id = HashMap::new(); - - for (lane_id, transactions) in executable_block.transaction_map.iter() { - transaction_hash_to_lane_id.extend( - transactions - .iter() - .map(|transaction| (transaction, *lane_id)), - ); - let max_count = chainspec - .transaction_config - .transaction_v1_config - .get_max_transaction_count(*lane_id); - if max_count == transactions.len() as u64 { - has_hit_slot_limt = true; - } - } - - if has_hit_slot_limt { - 100u64 - } else if executable_block.transactions.is_empty() { - 0u64 - } else { - let size_utilization: u64 = { - let total_size_of_transactions: u64 = executable_block - .transactions - .iter() - .map(|transaction| transaction.size_estimate() as u64) - .sum(); - - Ratio::new(total_size_of_transactions * 100, max_block_size).to_integer() - }; - let gas_utilization: u64 = { - let total_gas_limit: u64 = executable_block - .transactions - .iter() - .map(|transaction| { - match transaction_hash_to_lane_id.get(&transaction.hash()) { - Some(lane_id) => { - match &transaction.gas_limit(&chainspec, *lane_id) { - Ok(gas_limit) => gas_limit.value().as_u64(), - Err(_) => { - warn!("Unable to determine gas limit"); - 0u64 - } - } - } - None => { - warn!("Unable to determine gas limit"); - 0u64 - } - } - }) - .sum(); - - Ratio::new(total_gas_limit * 100, block_gas_limit).to_integer() - }; - - let slot_utilization = Ratio::new( - executable_block.transactions.len() as u64 * 100, - per_block_capacity, - ) - .to_integer(); - - let utilization_scores = [slot_utilization, gas_utilization, size_utilization]; + // dynamic gas price stuff + let era_id = executable_block.era_id; + let block_height = executable_block.height; + info!(%era_id, %block_height, "End of era calculating new gas price"); - match utilization_scores.iter().max() { - Some(max_score) => *max_score, - None => { - let error = BlockExecutionError::FailedToGetNewEraGasPrice { era_id }; - return fatal!(effect_builder, "{}", error).await; - } - } + if let Some(next_gas_price) = executable_block.next_era_gas_price { + // keep up nodes are executing a block as determined by validators + // and the next era gas price is already determined + return EraEndInstruction::ExecSwitch { next_gas_price }; + } + // we need to calculate the utilization of the block we are about to execute + // and include it in the tally of the utilization for the entire era. + let executable_block_utilization_score = + match executable_block.calc_utilization_score(&chainspec) { + Some(score) => score, + None => { + return EraEndInstruction::Fatal(format!( + "could not calc utilization of executable block {}", + block_height + )); } }; - let maybe_utilization = effect_builder - .get_block_utilization(era_id, block_height, switch_block_utilization_score) - .await; - debug!( - %era_id, - %block_height, - ?maybe_utilization, - "Calculated utilization for block" - ); - - match maybe_utilization { - None => { - let error = BlockExecutionError::FailedToGetNewEraGasPrice { era_id }; - return fatal!(effect_builder, "{}", error).await; - } - Some((utilization, block_count)) => { - let era_score = { Ratio::new(utilization, block_count).to_integer() }; - - let new_gas_price = if era_score >= go_up { - let new_gas_price = current_gas_price.saturating_add(1); - if new_gas_price > max { - max - } else { - new_gas_price - } - } else if era_score <= go_down { - let new_gas_price = current_gas_price.saturating_sub(1); - if new_gas_price <= min { - min - } else { - new_gas_price - } - } else { - current_gas_price - }; - info!(%new_gas_price, "Calculated new gas price"); - Some(new_gas_price) + // BLOCKING CALL + match effect_builder + .get_era_utilization(era_id, block_height, executable_block_utilization_score) + .await + { + Some((utilization, block_count, total_block_count)) => { + if block_count != total_block_count { + // The node needs awareness of all of the blocks for the era for which it tries to + // produce the switch block. + return EraEndInstruction::NoExec; } + + let current_gas_price = executable_block.current_gas_price; + let era_score = { Ratio::new(utilization, block_count).to_integer() }; + + let go_up = chainspec.vacancy_config.upper_threshold; + let go_down = chainspec.vacancy_config.lower_threshold; + let max = chainspec.vacancy_config.max_gas_price; + let min = chainspec.vacancy_config.min_gas_price; + let next_gas_price = if era_score >= go_up { + current_gas_price.saturating_add(1).min(max) + } else if era_score <= go_down { + current_gas_price.saturating_sub(1).max(min) + } else { + current_gas_price + }; + info!(%next_gas_price, "Calculated new gas price"); + EraEndInstruction::ExecSwitch { next_gas_price } } - } else if executable_block.next_era_gas_price.is_some() { - debug!( - %era_id, - %block_height, - next_era_gas_price = executable_block.next_era_gas_price, - "New gas price obtained from block" - ); - executable_block.next_era_gas_price - } else { - None - }; + None => { + let error = BlockExecutionError::FailedToGetNewEraGasPrice { era_id }; + EraEndInstruction::Fatal(format!("{}", error)) + } + } +} + +/// This function can fatal. +#[allow(clippy::too_many_arguments)] +pub(super) async fn exec_and_check_next( + data_access_layer: Arc>, + execution_engine_v1: Arc, + execution_engine_v2: ExecutorV2, + chainspec: Arc, + metrics: Arc, + mut exec_queue: ExecQueue, + shared_pre_state: Arc>, + current_pre_state: ExecutionPreState, + effect_builder: EffectBuilder, + mut executable_block: ExecutableBlock, + key_block_height_for_activation_point: u64, + mut meta_block_state: MetaBlockState, +) where + REv: From + + From + + From + + From + + From + + From + + Send, +{ + debug!("ContractRuntime: execute_finalized_block_or_requeue"); + // FIRST determine if we are aware of the last switch block header let era_id = executable_block.era_id; + let last_switch_block_hash = match era_id.predecessor() { + Some(previous_era) => { + let switch_block_header = effect_builder + .get_switch_block_header_by_era_id_from_storage(previous_era) + .await; + if switch_block_header.is_none() { + return fatal!( + effect_builder, + "switch block header can only be none for genesis era" + ) + .await; + } + switch_block_header.map(|header| header.block_hash()) + } + None => { + // genesis era + None + } + }; - let last_switch_block_hash = if let Some(previous_era) = era_id.predecessor() { - let switch_block_header = effect_builder - .get_switch_block_header_by_era_id_from_storage(previous_era) - .await; - switch_block_header.map(|header| header.block_hash()) - } else { - None + let era_end_instruction = handle_era_end( + data_access_layer.clone(), + chainspec.clone(), + metrics.clone(), + effect_builder, + &mut executable_block, + ) + .await; + debug!(?era_end_instruction, "era_end_instruction"); + let maybe_next_era_gas_price = match era_end_instruction { + EraEndInstruction::ExecNonSwitch => None, + EraEndInstruction::ExecSwitch { next_gas_price } => Some(next_gas_price), + EraEndInstruction::NoExec => { + // This means that we don't have enough data to calculate the era_end field + // The best thing we can do here is force the node to CatchUp with the hope + // that it will either acquire the missing state or the network will progress + // and we will move past this point. + info!( + block_height = executable_block.height, + "ContractRuntime: not enough data to execute switch block. Abandoning the execution." + ); + effect_builder + .announce_not_executing_block(executable_block.height) + .await; + return; + } + EraEndInstruction::Fatal(msg) => { + return fatal!(effect_builder, "{}", msg).await; + } }; + let current_gas_price = executable_block.current_gas_price; + let contract_runtime_metrics = metrics.clone(); let task = move || { debug!("ContractRuntime: execute_finalized_block"); execute_finalized_block( @@ -306,10 +304,11 @@ pub(super) async fn exec_or_requeue( } }; + // from this point onward we are dealing with the block we just created by executing let new_execution_pre_state = ExecutionPreState::from_block_header(block.header()); { // The `shared_pre_state` could have been set to a block we just fully synced after - // doing a sync leap (via a call to `set_initial_state`). We should not allow a block + // doing a sync leap (via a call to `set_execution_pre_state`). We should not allow a block // which completed execution just after this to set the `shared_pre_state` back to an // earlier block height. let mut shared_pre_state = shared_pre_state.lock().unwrap(); @@ -329,6 +328,7 @@ pub(super) async fn exec_or_requeue( } let current_era_id = block.era_id(); + let block_height = block.height(); if let Some(StepOutcome { step_effects, @@ -399,6 +399,8 @@ pub(super) async fn exec_or_requeue( ) .await; } + + // TODO: if it is an error why allow it in the first place? if meta_block_state .register_as_executed() .was_already_registered() @@ -411,19 +413,17 @@ pub(super) async fn exec_or_requeue( ); } - let meta_block = MetaBlock::new_forward(block, execution_artifacts, meta_block_state); - effect_builder.announce_meta_block(meta_block).await; - - // If the child is already finalized, start execution. - let next_block = exec_queue.remove(new_execution_pre_state.next_block_height()); - if let Some(next_era_gas_price) = maybe_next_era_gas_price { effect_builder .announce_new_era_gas_price(current_era_id.successor(), next_era_gas_price) .await; } + let meta_block = MetaBlock::new_forward(block, execution_artifacts, meta_block_state); + effect_builder.announce_meta_block(meta_block).await; + + let next_block = exec_queue.remove(new_execution_pre_state.next_block_height()); - // We schedule the next block from the queue to be executed: + // We schedule the next block from the queue to be executed, if available. if let Some(QueueItem { executable_block, meta_block_state, @@ -612,7 +612,7 @@ mod tests { #[test] fn calculation_is_lazy() { // NOTE: Range of EraInfos is lazy, so it does not consume memory, but getting the last - // batch out of u64::MAX of erainfos needs to iterate over all chunks. + // batch out of u64::MAX of era infos needs to iterate over all chunks. assert!(calculate_prune_eras(EraId::new(u64::MAX), 0, u64::MAX, 100,).is_none(),); assert_eq!( calculate_prune_eras(EraId::new(u64::MAX), 1, 100, 100) diff --git a/node/src/components/storage.rs b/node/src/components/storage.rs index 074aee9c45..4e073b6008 100644 --- a/node/src/components/storage.rs +++ b/node/src/components/storage.rs @@ -50,7 +50,7 @@ use casper_storage::block_store::{ }; use std::{ - borrow::Cow, + borrow::{Borrow, Cow}, collections::{BTreeMap, BTreeSet, HashMap, HashSet}, convert::TryInto, fmt::{self, Display, Formatter}, @@ -343,6 +343,9 @@ impl Storage { // Truncate the sequences in case we removed blocks via a hard reset. if let Some(header) = DataReader::::read(&ro_txn, Tip)? { sequences.truncate(header.height()); + } else { + // No tip left, the database is empty + sequences.clear(); } component.completed_blocks = sequences; @@ -384,9 +387,84 @@ impl Storage { } } component.persist_completed_blocks()?; + component.warm_up_utilization_tracker()?; Ok(component) } + /// Assume: + /// * E1 is the newest era that the node is aware of (has complete blocks for it). + /// * the highest completed_blocks sequence is [h_1, h_2] + /// + /// The node might be coming back from a crash or joining while E1 is still running. + /// So effectively the node will have complete blocks that are relevant to E1 on disk. + /// Because the node might be a validator in E1 we need to figure out what the + /// utilization tracking for those stored blocks is. + /// Without this data we will not be able to produce a switch block at the end of E1. + /// If any of h_n in [h_1,h_2] is from a previous era (E0) - we will stop warming up for that + /// previous era - the node should never be interested in reexecuting any other block that was + /// marked complete. if there is an "older" disjoint sequence [h_1`, h_2`] that also has a + /// height h_n which is in E1 - we won't warm up on that sequence. We are only interested in the + /// latest consecutive block range of blocks that are in E1 + fn warm_up_utilization_tracker(&mut self) -> Result<(), FatalStorageError> { + let mut initial_era = None; + if let Some(sequence) = self.completed_blocks.highest_sequence() { + let high = sequence.high(); + let low = sequence.low(); + let mut data_to_insert = vec![]; + { + let txn_ro = self.block_store.checkout_ro()?; + for block_height in (low..=high).rev() { + let block: Block = txn_ro + .read(block_height)? + .ok_or(FatalStorageError::BlockNotFound(block_height))?; + let block_hash = *block.hash(); + let era_id = block.era_id(); + if let Some(initial_era) = initial_era { + if initial_era != era_id { + //We traversed back to a block that is in a previous era + // We have no business in tracking utilization for previous era + // since the node will never be executing blocks from previous eras. + break; + } + } else { + initial_era.replace(era_id); + } + let transaction_hashes: Vec = + block.all_transaction_hashes().collect(); + let execution_results = Self::fetch_results_for_transactions( + &txn_ro, + &block_hash, + transaction_hashes, + )? + .ok_or(FatalStorageError::MissingExecutionResults(block_height))?; + if execution_results.is_empty() { + data_to_insert.push((era_id, block_height, 0)); + } else { + let mut map = HashMap::new(); + for (hash, execution_result) in execution_results { + map.insert(hash, execution_result); + } + let utilization = Self::calculate_block_utilization( + &self.transaction_config, + &block, + &map, + ); + data_to_insert.push((era_id, block_height, utilization)); + } + } + } + for (era_id, block_height, utilization) in data_to_insert { + Self::insert_into_utilization_tracker( + &mut self.utilization_tracker, + era_id, + block_height, + utilization, + ); + } + } + Ok(()) + } + /// Returns the path to the storage folder. pub(crate) fn root_path(&self) -> &Path { &self.root @@ -763,9 +841,24 @@ impl Storage { execution_results, responder, } => { + let block_hash = *block_hash; let mut rw_txn = self.block_store.checkout_rw()?; + let maybe_block: Option = rw_txn.read(block_hash)?; + if let Some(block) = maybe_block { + let utilization = Self::calculate_block_utilization( + &self.transaction_config, + &block, + &execution_results, + ); + Self::insert_into_utilization_tracker( + &mut self.utilization_tracker, + era_id, + block_height, + utilization, + ); + } let _ = rw_txn.write(&BlockExecutionResults { - block_info: BlockHashHeightAndEra::new(*block_hash, block_height, era_id), + block_info: BlockHashHeightAndEra::new(block_hash, block_height, era_id), exec_results: execution_results, })?; rw_txn.commit()?; @@ -963,10 +1056,8 @@ impl Storage { responder, } => { let block: Block = (*block).clone().into(); - let transaction_config = self.transaction_config.clone(); responder .respond(self.put_executed_block( - transaction_config, &block, &approvals_hashes, execution_results, @@ -1002,17 +1093,14 @@ impl Storage { Some(db_raw) => responder.respond(Some(db_raw)).ignore(), } } - StorageRequest::GetBlockUtilizationScore { + StorageRequest::GetEraUtilizationScore { era_id, block_height, switch_block_utilization, responder, } => { - let utilization = self.get_block_utilization_score( - era_id, - block_height, - switch_block_utilization, - ); + let utilization = + self.get_era_utilization_score(era_id, block_height, switch_block_utilization); responder.respond(utilization).ignore() } @@ -1058,79 +1146,18 @@ impl Storage { pub(crate) fn put_executed_block( &mut self, - transaction_config: TransactionConfig, block: &Block, approvals_hashes: &ApprovalsHashes, execution_results: HashMap, ) -> Result { let mut txn = self.block_store.checkout_rw()?; let era_id = block.era_id(); - let block_utilization_score = block.block_utilization(transaction_config.clone()); - let has_hit_slot_limit = block.has_hit_slot_capacity(transaction_config.clone()); let block_hash = txn.write(block)?; let _ = txn.write(approvals_hashes)?; - let block_info = BlockHashHeightAndEra::new(block_hash, block.height(), block.era_id()); - - let utilization = if has_hit_slot_limit { - debug!("Block is at slot capacity, using slot utilization score"); - block_utilization_score - } else if execution_results.is_empty() { - 0u64 - } else { - let total_gas_utilization = { - let total_gas_limit: U512 = execution_results - .values() - .map(|results| match results { - ExecutionResult::V1(v1_result) => match v1_result { - ExecutionResultV1::Failure { cost, .. } => *cost, - ExecutionResultV1::Success { cost, .. } => *cost, - }, - ExecutionResult::V2(v2_result) => v2_result.limit.value(), - }) - .sum(); - - let consumed: u64 = total_gas_limit.as_u64(); - let block_gas_limit = transaction_config.block_gas_limit; - - Ratio::new(consumed * 100u64, block_gas_limit).to_integer() - }; - debug!("Gas utilization at {total_gas_utilization}"); - - let total_size_utilization = { - let size_used: u64 = execution_results - .values() - .map(|results| { - if let ExecutionResult::V2(result) = results { - result.size_estimate - } else { - 0u64 - } - }) - .sum(); - - let block_size_limit = transaction_config.max_block_size as u64; - Ratio::new(size_used * 100, block_size_limit).to_integer() - }; - - debug!("Storage utilization at {total_size_utilization}"); - - let scores = [ - block_utilization_score, - total_size_utilization, - total_gas_utilization, - ]; - - match scores.iter().max() { - Some(max_utlization) => *max_utlization, - None => { - // This should never happen as we just created the scores vector to find the - // max value - warn!("Unable to determine max utilization, marking 0 utilization"); - 0u64 - } - } - }; + let utilization = + Self::calculate_block_utilization(&self.transaction_config, block, &execution_results); + let block_info = BlockHashHeightAndEra::new(block_hash, block.height(), block.era_id()); debug!("Utilization for block is {utilization}"); let _ = txn.write(&BlockExecutionResults { @@ -1139,16 +1166,12 @@ impl Storage { })?; txn.commit()?; - match self.utilization_tracker.get_mut(&era_id) { - Some(block_score) => { - block_score.insert(block.height(), utilization); - } - None => { - let mut block_score = BTreeMap::new(); - block_score.insert(block.height(), utilization); - self.utilization_tracker.insert(era_id, block_score); - } - } + Self::insert_into_utilization_tracker( + &mut self.utilization_tracker, + era_id, + block.height(), + utilization, + ); Ok(true) } @@ -1977,23 +2000,7 @@ impl Storage { .collect(), BlockBody::V2(v2) => v2.all_transactions().copied().collect(), }; - let mut execution_results = vec![]; - for transaction_hash in transaction_hashes { - match txn.read(transaction_hash)? { - None => { - debug!( - %block_hash, - %transaction_hash, - "retrieved block but execution result for given transaction is absent" - ); - return Ok(None); - } - Some(execution_result) => { - execution_results.push((transaction_hash, execution_result)); - } - } - } - Ok(Some(execution_results)) + Self::fetch_results_for_transactions(txn, block_hash, transaction_hashes) } #[allow(clippy::type_complexity)] @@ -2033,37 +2040,178 @@ impl Storage { Ok(Some(ret)) } - fn get_block_utilization_score( + fn get_era_utilization_score( &mut self, era_id: EraId, - block_height: u64, - block_utilization: u64, - ) -> Option<(u64, u64)> { + era_switch_block_height: u64, + era_switch_block_utilization: u64, + ) -> Option<(u64, u64, u64)> { let ret = match self.utilization_tracker.get_mut(&era_id) { Some(utilization) => { - utilization.entry(block_height).or_insert(block_utilization); + utilization + .entry(era_switch_block_height) + .or_insert(era_switch_block_utilization); - let transaction_count = utilization.values().sum(); + let era_utilization = utilization.values().sum(); let block_count = utilization.keys().len() as u64; + let total_blocks_for_era = match era_id.predecessor() { + Some(previous_era) => { + let txn = match self.block_store.checkout_ro() { + Ok(txn) => txn, + Err(_) => return None, + }; + let previous_switch_block_height = + match txn.get_switch_block_height(previous_era) { + Ok(Some(height)) => height, + Ok(None) | Err(_) => return None, + }; + // Determine expected number of blocks from the block_height + // minus the height of the previous switch block + // sw-e1 -> b1 b2 b3 b4 sw-e2 + // 11 12 13 14 15 16 + // answer: 5 (16-11) + era_switch_block_height.saturating_sub(previous_switch_block_height) + } + // Genesis case + None => era_switch_block_height, + }; - Some((transaction_count, block_count)) + Some((era_utilization, block_count, total_blocks_for_era)) } None => { let mut utilization = BTreeMap::new(); - utilization.insert(block_height, block_utilization); + utilization.insert(era_switch_block_height, era_switch_block_utilization); self.utilization_tracker.insert(era_id, utilization); let block_count = 1u64; - Some((block_utilization, block_count)) + let total_blocks_for_era = block_count; + Some(( + era_switch_block_utilization, + block_count, + total_blocks_for_era, + )) } }; self.utilization_tracker - .retain(|key_era_id, _| key_era_id.value() + 2 >= era_id.value()); + .retain(|key_era_id, _| key_era_id.value() >= era_id.value()); ret } + + fn calculate_block_utilization( + transaction_config_input: impl Borrow, + block: &Block, + execution_results: &HashMap, + ) -> u64 { + let transaction_config = transaction_config_input.borrow(); + let block_utilization_score = block.block_utilization(transaction_config); + let has_hit_slot_limit = block.has_hit_slot_capacity(transaction_config); + + let utilization = if has_hit_slot_limit { + debug!("Block is at slot capacity, using slot utilization score"); + block_utilization_score + } else if execution_results.is_empty() { + 0u64 + } else { + let total_gas_utilization = { + let total_gas_limit: U512 = execution_results + .values() + .map(|results| match results { + ExecutionResult::V1(v1_result) => match v1_result { + ExecutionResultV1::Failure { cost, .. } => *cost, + ExecutionResultV1::Success { cost, .. } => *cost, + }, + ExecutionResult::V2(v2_result) => v2_result.limit.value(), + }) + .sum(); + + let consumed: u64 = total_gas_limit.as_u64(); + let block_gas_limit = transaction_config.block_gas_limit; + + Ratio::new(consumed * 100u64, block_gas_limit).to_integer() + }; + debug!("Gas utilization at {total_gas_utilization}"); + + let total_size_utilization = { + let size_used: u64 = execution_results + .values() + .map(|results| { + if let ExecutionResult::V2(result) = results { + result.size_estimate + } else { + 0u64 + } + }) + .sum(); + + let block_size_limit = transaction_config.max_block_size as u64; + Ratio::new(size_used * 100, block_size_limit).to_integer() + }; + + debug!("Storage utilization at {total_size_utilization}"); + + let scores = [ + block_utilization_score, + total_size_utilization, + total_gas_utilization, + ]; + + match scores.iter().max() { + Some(max_utilization) => *max_utilization, + None => { + // This should never happen as we just created the scores vector to find the + // max value + warn!("Unable to determine max utilization, marking 0 utilization"); + 0u64 + } + } + }; + utilization + } + + fn fetch_results_for_transactions( + txn: &(impl DataReader + DataReader), + block_hash: &BlockHash, + transaction_hashes: Vec, + ) -> Result>, FatalStorageError> { + let mut execution_results = vec![]; + for transaction_hash in transaction_hashes { + match txn.read(transaction_hash)? { + None => { + debug!( + %block_hash, + %transaction_hash, + "retrieved block but execution result for given transaction is absent" + ); + return Ok(None); + } + Some(execution_result) => { + execution_results.push((transaction_hash, execution_result)); + } + } + } + Ok(Some(execution_results)) + } + + fn insert_into_utilization_tracker( + utilization_tracker: &mut BTreeMap>, + era_id: EraId, + block_height: u64, + utilization: u64, + ) { + match utilization_tracker.get_mut(&era_id) { + Some(block_score) => { + block_score.insert(block_height, utilization); + } + None => { + let mut block_score = BTreeMap::new(); + block_score.insert(block_height, utilization); + utilization_tracker.insert(era_id, block_score); + } + } + } } /// Decodes an item's ID, typically from an incoming request. @@ -2369,4 +2517,23 @@ impl Storage { execution_result, }) } + + pub(crate) fn delete_block_utilization_score_by_block_hash(&mut self, block_hash: BlockHash) { + let txn = self.block_store.checkout_ro().expect("mut get read only"); + let block_header: BlockHeader = txn + .read(block_hash) + .expect("should read") + .expect("must have header"); + + let era = block_header.era_id(); + let height = block_header.height(); + + let era_score = self + .utilization_tracker + .get_mut(&era) + .expect("must have era tracker"); + era_score + .remove(&height) + .expect("must have previous entry for this height"); + } } diff --git a/node/src/components/storage/disjoint_sequences.rs b/node/src/components/storage/disjoint_sequences.rs index 3e10e803f1..6309b85efb 100644 --- a/node/src/components/storage/disjoint_sequences.rs +++ b/node/src/components/storage/disjoint_sequences.rs @@ -217,6 +217,10 @@ impl DisjointSequences { true }) } + + pub(super) fn clear(&mut self) { + self.sequences.clear(); + } } #[cfg(test)] impl DisjointSequences { diff --git a/node/src/components/storage/error.rs b/node/src/components/storage/error.rs index 9ba2fcf5f5..d5787e42e7 100644 --- a/node/src/components/storage/error.rs +++ b/node/src/components/storage/error.rs @@ -163,6 +163,12 @@ pub enum FatalStorageError { /// BlockStoreError #[error("unexpected record id {0}")] UnexpectedRecordId(RecordId), + /// Block not found + #[error("Couldn't find block at height {0}")] + BlockNotFound(u64), + /// Execution results missing + #[error("Couldn't find all execution results for block at height: {0}")] + MissingExecutionResults(u64), } impl From> for FatalStorageError { diff --git a/node/src/components/storage/tests.rs b/node/src/components/storage/tests.rs index 4ec42a91ef..1ba5ea286c 100644 --- a/node/src/components/storage/tests.rs +++ b/node/src/components/storage/tests.rs @@ -27,8 +27,8 @@ use casper_types::{ BlockSignatures, BlockSignaturesV2, BlockV2, ChainNameDigest, Chainspec, ChainspecRawBytes, Deploy, DeployHash, Digest, EraId, ExecutionInfo, FinalitySignature, FinalitySignatureV2, Gas, InitiatorAddr, ProtocolVersion, PublicKey, SecretKey, TestBlockBuilder, TestBlockV1Builder, - TimeDiff, Transaction, TransactionConfig, TransactionHash, TransactionV1Hash, Transfer, - TransferV2, U512, + TimeDiff, Timestamp, Transaction, TransactionConfig, TransactionHash, TransactionV1Hash, + Transfer, TransferV2, U512, }; use tempfile::tempdir; @@ -45,7 +45,8 @@ use crate::{ storage::TransactionHeader, testing::{ComponentHarness, UnitTestEvent}, types::{ - sync_leap_validation_metadata::SyncLeapValidationMetaData, BlockWithMetadata, + sync_leap_validation_metadata::SyncLeapValidationMetaData, + transaction::transaction_v1_builder::TransactionV1Builder, BlockWithMetadata, SyncLeapIdentifier, }, utils::{Loadable, WithDir}, @@ -3124,3 +3125,112 @@ fn check_block_operations_with_node_1_5_2_storage() { ); } } + +#[test] +fn storage_should_warm_up_utilization_tracking() { + let mut harness = ComponentHarness::default(); + let pk = PublicKey::random_ed25519(&mut harness.rng); + let transaction = TransactionV1Builder::new() + .with_chain_name("a") + .with_timestamp(Timestamp::now()) + .with_initiator_addr(pk) + .build() + .unwrap(); + let transaction_hash = *transaction.hash(); + + let block_32 = TestBlockBuilder::new() + .era(1) + .height(32) + .protocol_version(ProtocolVersion::from_parts(2, 0, 0)) + .switch_block(true) + .build_versioned(&mut harness.rng); + let block_33 = TestBlockBuilder::new() + .era(2) + .height(33) + .protocol_version(ProtocolVersion::from_parts(2, 0, 0)) + .switch_block(false) + .build_versioned(&mut harness.rng); + let block_34 = TestBlockBuilder::new() + .era(2) + .height(34) + .protocol_version(ProtocolVersion::from_parts(2, 0, 0)) + .transactions(vec![&Transaction::V1(transaction)]) + .switch_block(false) + .build_versioned(&mut harness.rng); + + let mut storage = storage_fixture(&harness); + put_complete_block(&mut harness, &mut storage, block_32.clone()); + put_complete_block(&mut harness, &mut storage, block_33.clone()); + put_complete_block(&mut harness, &mut storage, block_34.clone()); + let mut execution_results: HashMap = HashMap::new(); + execution_results.insert( + TransactionHash::V1(transaction_hash), + ExecutionResult::from(ExecutionResultV2::random(&mut harness.rng)), + ); + put_execution_results( + &mut harness, + &mut storage, + *block_34.hash(), + block_34.height(), + block_34.era_id(), + execution_results, + ); + drop(storage); + // We want the warm up to happen again + let storage = storage_fixture(&harness); + // We don't care about old eras + assert!(!storage.utilization_tracker.contains_key(&EraId::new(1))); + let utilization_for_era_2 = storage + .utilization_tracker + .get(&EraId::new(2)) + .expect("expected entry for era: 2"); + assert_eq!( + *utilization_for_era_2 + .get(&33) + .expect("expected entry for h: 33"), + 0_u64 + ); + assert!( + *utilization_for_era_2 + .get(&34) + .expect("expected entry for h: 34") + > 0 + ); //We don't really care about the value, but there were execution results so it shouldn't be + // 0 +} + +#[test] +fn storage_warm_up_should_ignore_old_disjoint_sequence() { + let mut harness = ComponentHarness::default(); + + let block_31 = TestBlockBuilder::new() + .era(2) + .height(31) + .protocol_version(ProtocolVersion::from_parts(2, 0, 0)) + .switch_block(true) + .build_versioned(&mut harness.rng); + let block_33 = TestBlockBuilder::new() + .era(2) + .height(33) + .protocol_version(ProtocolVersion::from_parts(2, 0, 0)) + .switch_block(false) + .build_versioned(&mut harness.rng); + + let mut storage = storage_fixture(&harness); + put_complete_block(&mut harness, &mut storage, block_31.clone()); + put_complete_block(&mut harness, &mut storage, block_33.clone()); + drop(storage); + // We want the warm up to happen again + let storage = storage_fixture(&harness); + let utilization_for_era_2 = storage + .utilization_tracker + .get(&EraId::new(2)) + .expect("expected entry for era: 2"); + assert!(utilization_for_era_2.get(&31_u64).is_none()); + assert_eq!( + *utilization_for_era_2 + .get(&33_u64) + .expect("expected value for h: 33"), + 0 + ); +} diff --git a/node/src/effect.rs b/node/src/effect.rs index caf27998c9..08e3c73273 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -153,6 +153,7 @@ use crate::{ transaction_acceptor, }, contract_runtime::ExecutionPreState, + effect::announcements::NonExecutableBlockAnnouncement, failpoints::FailpointActivation, reactor::{main_reactor::ReactorState, EventQueueHandle, QueueKind}, types::{ @@ -1093,17 +1094,17 @@ impl EffectBuilder { .await } - pub(crate) async fn get_block_utilization( + pub(crate) async fn get_era_utilization( self, era_id: EraId, block_height: u64, transaction_count: u64, - ) -> Option<(u64, u64)> + ) -> Option<(u64, u64, u64)> where REv: From, { self.make_request( - |responder| StorageRequest::GetBlockUtilizationScore { + |responder| StorageRequest::GetEraUtilizationScore { era_id, block_height, switch_block_utilization: transaction_count, @@ -1421,6 +1422,9 @@ impl EffectBuilder { .await } + /// This is currently used for reporting purposes (node status). It should not be used + /// for load bearing determinations, as the reactor state can change between asking for it + /// and being notified about it due to event processing latency. pub(crate) async fn get_reactor_state(self) -> ReactorState where REv: From, @@ -1896,7 +1900,7 @@ impl EffectBuilder { /// Announces that a finalized block has been created, but it was not /// executed. - pub(crate) async fn announce_unexecuted_block(self, block_height: u64) + pub(crate) async fn announce_not_enqueuing_old_executable_block(self, block_height: u64) where REv: From, { @@ -1908,6 +1912,20 @@ impl EffectBuilder { .await; } + /// Announces that a finalized block has been created, but it was not + /// executed due to subjective node state. + pub(crate) async fn announce_not_executing_block(self, block_height: u64) + where + REv: From, + { + self.event_queue + .schedule( + NonExecutableBlockAnnouncement(block_height), + QueueKind::Regular, + ) + .await; + } + /// An equivocation has been detected. pub(crate) async fn announce_fault_event( self, diff --git a/node/src/effect/announcements.rs b/node/src/effect/announcements.rs index 4171c77679..629d927b96 100644 --- a/node/src/effect/announcements.rs +++ b/node/src/effect/announcements.rs @@ -164,6 +164,19 @@ impl Display for UnexecutedBlockAnnouncement { } } +#[derive(DataSize, Serialize, Debug)] +pub(crate) struct NonExecutableBlockAnnouncement(pub(crate) u64); + +impl Display for NonExecutableBlockAnnouncement { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "announcement for a block that couldn't be executed at height {}", + self.0, + ) + } +} + /// Queue dump format with handler. #[derive(Serialize)] pub(crate) enum QueueDumpFormat { diff --git a/node/src/effect/requests.rs b/node/src/effect/requests.rs index 59bfb46157..a0f8c52869 100644 --- a/node/src/effect/requests.rs +++ b/node/src/effect/requests.rs @@ -514,8 +514,8 @@ pub(crate) enum StorageRequest { }, /// Retrieve the height of the final block of the previous protocol version, if known. GetKeyBlockHeightForActivationPoint { responder: Responder> }, - /// Retrieve the block utilization score. - GetBlockUtilizationScore { + /// Retrieve the era utilization score. + GetEraUtilizationScore { /// The era id. era_id: EraId, /// The block height of the switch block @@ -523,7 +523,7 @@ pub(crate) enum StorageRequest { /// The utilization within the switch block. switch_block_utilization: u64, /// Responder, responded once the utilization for the era has been determined. - responder: Responder>, + responder: Responder>, }, } @@ -677,7 +677,7 @@ impl Display for StorageRequest { } => { write!(formatter, "get raw data {}::{:?}", record_id, key) } - StorageRequest::GetBlockUtilizationScore { era_id, .. } => { + StorageRequest::GetEraUtilizationScore { era_id, .. } => { write!(formatter, "get utilization score for era {}", era_id) } } diff --git a/node/src/reactor/main_reactor.rs b/node/src/reactor/main_reactor.rs index 80678bb7ff..4e91d452d7 100644 --- a/node/src/reactor/main_reactor.rs +++ b/node/src/reactor/main_reactor.rs @@ -51,8 +51,7 @@ use crate::{ storage::Storage, sync_leaper::SyncLeaper, transaction_acceptor::{self, TransactionAcceptor}, - transaction_buffer, - transaction_buffer::TransactionBuffer, + transaction_buffer::{self, TransactionBuffer}, upgrade_watcher::{self, UpgradeWatcher}, Component, ValidatorBoundComponent, }, @@ -61,8 +60,9 @@ use crate::{ BlockAccumulatorAnnouncement, ConsensusAnnouncement, ContractRuntimeAnnouncement, ControlAnnouncement, FetchedNewBlockAnnouncement, FetchedNewFinalitySignatureAnnouncement, GossiperAnnouncement, MetaBlockAnnouncement, - PeerBehaviorAnnouncement, TransactionAcceptorAnnouncement, - TransactionBufferAnnouncement, UnexecutedBlockAnnouncement, UpgradeWatcherAnnouncement, + NonExecutableBlockAnnouncement, PeerBehaviorAnnouncement, + TransactionAcceptorAnnouncement, TransactionBufferAnnouncement, + UnexecutedBlockAnnouncement, UpgradeWatcherAnnouncement, }, incoming::{NetResponseIncoming, TrieResponseIncoming}, requests::{ @@ -203,6 +203,8 @@ pub(crate) struct MainReactor { finality_signature_creation: bool, prevent_validator_shutdown: bool, + + force_catchup: bool, } impl reactor::Reactor for MainReactor { @@ -308,7 +310,12 @@ impl reactor::Reactor for MainReactor { Effects::new() } } - + MainEvent::NonExecutableBlockAnnouncement(NonExecutableBlockAnnouncement( + _block_height, + )) => { + self.force_catchup = true; + Effects::new() + } // LOCAL I/O BOUND COMPONENTS MainEvent::UpgradeWatcher(event) => reactor::wrap_effects( MainEvent::UpgradeWatcher, @@ -1290,6 +1297,7 @@ impl reactor::Reactor for MainReactor { node_startup_instant, finality_signature_creation: true, prevent_validator_shutdown, + force_catchup: false, }; info!("MainReactor: instantiated"); diff --git a/node/src/reactor/main_reactor/control.rs b/node/src/reactor/main_reactor/control.rs index 8de6a3394a..d62e8a034e 100644 --- a/node/src/reactor/main_reactor/control.rs +++ b/node/src/reactor/main_reactor/control.rs @@ -188,6 +188,9 @@ impl MainReactor { (Duration::ZERO, Effects::new()) } KeepUpInstruction::Validate(effects) => { + if let Err(msg) = self.refresh_contract_runtime() { + return (Duration::ZERO, fatal!(effect_builder, "{}", msg).ignore()); + } info!("KeepUp: switch to Validate"); // purge to avoid polluting the status endpoints w/ stale state self.block_synchronizer.purge(); @@ -548,7 +551,8 @@ impl MainReactor { parent_hash, parent_seed, ); - self.contract_runtime.set_initial_state(initial_pre_state); + self.contract_runtime + .set_execution_pre_state(initial_pre_state); } pub(super) fn update_last_progress( diff --git a/node/src/reactor/main_reactor/event.rs b/node/src/reactor/main_reactor/event.rs index 52ec2cd6b8..b15f5e9046 100644 --- a/node/src/reactor/main_reactor/event.rs +++ b/node/src/reactor/main_reactor/event.rs @@ -23,8 +23,9 @@ use crate::{ BlockAccumulatorAnnouncement, ConsensusAnnouncement, ContractRuntimeAnnouncement, ControlAnnouncement, FatalAnnouncement, FetchedNewBlockAnnouncement, FetchedNewFinalitySignatureAnnouncement, GossiperAnnouncement, MetaBlockAnnouncement, - PeerBehaviorAnnouncement, TransactionAcceptorAnnouncement, - TransactionBufferAnnouncement, UnexecutedBlockAnnouncement, UpgradeWatcherAnnouncement, + NonExecutableBlockAnnouncement, PeerBehaviorAnnouncement, + TransactionAcceptorAnnouncement, TransactionBufferAnnouncement, + UnexecutedBlockAnnouncement, UpgradeWatcherAnnouncement, }, diagnostics_port::DumpConsensusStateRequest, incoming::{ @@ -242,6 +243,8 @@ pub(crate) enum MainEvent { MetaBlockAnnouncement(MetaBlockAnnouncement), #[from] UnexecutedBlockAnnouncement(UnexecutedBlockAnnouncement), + #[from] + NonExecutableBlockAnnouncement(NonExecutableBlockAnnouncement), // Event related to figuring out validators for blocks after upgrades. GotBlockAfterUpgradeEraValidators(EraId, EraValidators, EraValidators), @@ -360,6 +363,7 @@ impl ReactorEvent for MainEvent { "GotImmediateSwitchBlockEraValidators" } MainEvent::BinaryPort(_) => "BinaryPort", + MainEvent::NonExecutableBlockAnnouncement(_) => "NonExecutableBlockAnnouncement", } } } @@ -544,6 +548,7 @@ impl Display for MainEvent { ) } MainEvent::BinaryPort(inner) => Display::fmt(inner, f), + MainEvent::NonExecutableBlockAnnouncement(inner) => Display::fmt(inner, f), } } } diff --git a/node/src/reactor/main_reactor/keep_up.rs b/node/src/reactor/main_reactor/keep_up.rs index 925be6f7fd..8acca57b8b 100644 --- a/node/src/reactor/main_reactor/keep_up.rs +++ b/node/src/reactor/main_reactor/keep_up.rs @@ -70,6 +70,10 @@ impl MainReactor { // controlled shutdown for protocol upgrade. return KeepUpInstruction::ShutdownForUpgrade; } + if self.force_catchup { + self.force_catchup = false; + return KeepUpInstruction::CatchUp; + } // if there is instruction, return to start working on it // else fall thru with the current best available id for block syncing @@ -256,7 +260,7 @@ impl MainReactor { } } SyncInstruction::BlockSync { block_hash } => { - debug!("KeepUp: BlockSync: {:?}", block_hash); + info!("KeepUp: BlockSync: {:?}", block_hash); if self .block_synchronizer .register_block_by_hash(block_hash, false) diff --git a/node/src/reactor/main_reactor/tests.rs b/node/src/reactor/main_reactor/tests.rs index 1b22e06eaa..c9944e25dc 100644 --- a/node/src/reactor/main_reactor/tests.rs +++ b/node/src/reactor/main_reactor/tests.rs @@ -57,6 +57,10 @@ impl Runner>> { fn main_reactor(&self) -> &MainReactor { self.reactor().inner().inner() } + + fn main_reactor_as_mut(&mut self) -> &mut MainReactor { + self.reactor.inner_mut().inner_mut() + } } /// Given a block height and a node id, returns a predicate to check if the lowest available block diff --git a/node/src/reactor/main_reactor/tests/configs_override.rs b/node/src/reactor/main_reactor/tests/configs_override.rs index a3e243c036..5734f1e776 100644 --- a/node/src/reactor/main_reactor/tests/configs_override.rs +++ b/node/src/reactor/main_reactor/tests/configs_override.rs @@ -128,6 +128,15 @@ impl ConfigsOverride { self } + pub(crate) fn with_idle_tolerance(mut self, idle_tolernace: TimeDiff) -> Self { + let config = NodeConfigOverride { + idle_tolerance: Some(idle_tolernace), + ..Default::default() + }; + self.node_config_override = config; + self + } + pub(crate) fn with_minimum_delegation_rate(mut self, minimum_delegation_rate: u8) -> Self { self.minimum_delegation_rate = minimum_delegation_rate; self @@ -171,4 +180,5 @@ impl Default for ConfigsOverride { #[derive(Clone, Default)] pub(crate) struct NodeConfigOverride { pub sync_handling_override: Option, + pub idle_tolerance: Option, } diff --git a/node/src/reactor/main_reactor/tests/fixture.rs b/node/src/reactor/main_reactor/tests/fixture.rs index 6c6a9b8f15..4bbda7c74e 100644 --- a/node/src/reactor/main_reactor/tests/fixture.rs +++ b/node/src/reactor/main_reactor/tests/fixture.rs @@ -23,9 +23,9 @@ use casper_types::{ system::auction::{DelegationRate, DelegatorKind}, testing::TestRng, AccountConfig, AccountsConfig, ActivationPoint, AddressableEntityHash, Block, BlockBody, - BlockHash, BlockV2, CLValue, Chainspec, ChainspecRawBytes, EraId, Key, Motes, NextUpgrade, - ProtocolVersion, PublicKey, SecretKey, StoredValue, SystemHashRegistry, TimeDiff, Timestamp, - Transaction, TransactionHash, ValidatorConfig, U512, + BlockHash, BlockV2, CLValue, Chainspec, ChainspecRawBytes, EraEnd, EraId, Key, Motes, + NextUpgrade, ProtocolVersion, PublicKey, SecretKey, StoredValue, SystemHashRegistry, TimeDiff, + Timestamp, Transaction, TransactionHash, ValidatorConfig, U512, }; use crate::{ @@ -404,10 +404,14 @@ impl TestFixture { }; let NodeConfigOverride { sync_handling_override, + idle_tolerance, } = node_config_override; if let Some(sync_handling) = sync_handling_override { cfg.node.sync_handling = sync_handling; } + if let Some(idle) = idle_tolerance { + cfg.node.idle_tolerance = idle + } // Additionally set up storage in a temporary directory. let (storage_cfg, temp_dir) = storage::Config::new_for_tests(storage_multiplier); @@ -901,6 +905,60 @@ impl TestFixture { } } + pub(crate) fn delete_block_utilization_score_by_block_hash_in_node( + &mut self, + node_public_key: &PublicKey, + block_hash: BlockHash, + ) { + let (_, runner) = self + .network + .nodes_mut() + .iter_mut() + .find(|(_, runner)| runner.main_reactor().consensus.public_key() == node_public_key) + .expect("should have runner"); + + runner + .main_reactor_as_mut() + .storage + .delete_block_utilization_score_by_block_hash(block_hash) + } + + pub(crate) async fn check_gas_price_for_nodes( + &mut self, + expected_gas_price: u8, + within: Duration, + ) { + self.try_run_until( + move |nodes| { + nodes.values().all(|runner| { + let era_end = runner + .main_reactor() + .storage() + .read_highest_switch_block_headers(1) + .unwrap() + .last() + .expect("must have block header") + .clone_era_end() + .expect("must have era end for switch block"); + + if let EraEnd::V2(era_end) = era_end { + era_end.next_era_gas_price() == expected_gas_price + } else { + false + } + }) + }, + within, + ) + .await + .unwrap_or_else(|_| { + panic!( + "should have same gas price across all nodes within {} seconds", + within.as_secs_f64(), + ) + }) + } + /// Returns the execution results from storage. /// Panics on error. #[track_caller] diff --git a/node/src/reactor/main_reactor/tests/gas_price.rs b/node/src/reactor/main_reactor/tests/gas_price.rs index 9608716c5b..5496a624e7 100644 --- a/node/src/reactor/main_reactor/tests/gas_price.rs +++ b/node/src/reactor/main_reactor/tests/gas_price.rs @@ -1,13 +1,14 @@ use std::{sync::Arc, time::Duration}; use casper_types::{ - testing::TestRng, Chainspec, PricingHandling, PricingMode, PublicKey, SecretKey, TimeDiff, - Transaction, TransactionV1Config, U512, + testing::TestRng, Chainspec, EraId, PricingHandling, PricingMode, PublicKey, SecretKey, + TimeDiff, Transaction, TransactionV1Config, U512, }; use crate::{ reactor::main_reactor::tests::{ - configs_override::ConfigsOverride, fixture::TestFixture, ERA_ONE, ONE_MIN, + configs_override::ConfigsOverride, fixture::TestFixture, initial_stakes::InitialStakes, + ERA_ONE, ERA_TWO, ERA_ZERO, ONE_MIN, TEN_SECS, THIRTY_SECS, }, types::transaction::transaction_v1_builder::TransactionV1Builder, }; @@ -44,6 +45,7 @@ async fn run_gas_price_scenario(gas_price_scenario: GasPriceScenario) { let non_validating_public_key = PublicKey::from(&non_validating_secret_key); secret_keys.push(Arc::new(non_validating_secret_key)); + let min_gas_price: u8 = 1; let max_gas_price: u8 = 3; let mut transaction_config = TransactionV1Config::default(); @@ -53,6 +55,10 @@ async fn run_gas_price_scenario(gas_price_scenario: GasPriceScenario) { let spec_override = match gas_price_scenario { GasPriceScenario::SlotUtilization => { + let mut transaction_config = TransactionV1Config::default(); + transaction_config + .native_mint_lane + .set_max_transaction_count(1); ConfigsOverride::default().with_transaction_v1_config(transaction_config) } GasPriceScenario::SizeUtilization(block_size) => { @@ -65,25 +71,41 @@ async fn run_gas_price_scenario(gas_price_scenario: GasPriceScenario) { .with_lower_threshold(5u64) .with_upper_threshold(10u64) .with_minimum_era_height(5) + .with_min_gas_price(min_gas_price) .with_max_gas_price(max_gas_price); let mut fixture = TestFixture::new_with_keys(rng, secret_keys, stakes, Some(spec_override)).await; + assert_eq!( + min_gas_price, + fixture.chainspec.vacancy_config.min_gas_price + ); + assert_eq!( + max_gas_price, + fixture.chainspec.vacancy_config.max_gas_price + ); + let alice_secret_key = Arc::clone(&fixture.node_contexts[0].secret_key); let alice_public_key = PublicKey::from(&*alice_secret_key); fixture - .run_until_stored_switch_block_header(ERA_ONE, ONE_MIN) + .run_until_stored_switch_block_header(ERA_ZERO, ONE_MIN) .await; - let switch_block = fixture.switch_block(ERA_ONE); + let mut switch_block = fixture.switch_block(ERA_ZERO); + let mut next_gas_price = switch_block + .era_end() + .expect("this is a switch block") + .next_era_gas_price(); + assert_eq!(next_gas_price, min_gas_price, "price should start at min"); let mut current_era = switch_block.era_id(); let chain_name = fixture.chainspec.network_config.name.clone(); + assert_eq!(current_era, EraId::new(0), "current era should be genesis"); // Run the network at load for at least 5 eras. - for _ in 0..5 { + for idx in 1..=max_gas_price { let rng = fixture.rng_mut(); let target_public_key = PublicKey::random(rng); let fixed_native_mint_transaction = @@ -105,15 +127,38 @@ async fn run_gas_price_scenario(gas_price_scenario: GasPriceScenario) { fixture .run_until_stored_switch_block_header(next_era, ONE_MIN) .await; + switch_block = fixture.switch_block(EraId::new(idx as u64)); + next_gas_price = switch_block + .era_end() + .expect("this is a switch block") + .next_era_gas_price(); + let expected = { + let mut val = min_gas_price + idx; + if val > max_gas_price { + val = max_gas_price; + } + val + }; + assert_eq!( + next_gas_price, expected, + "price goes up by 1 each era (with current settings), up to the max" + ); current_era = next_era; } - let expected_gas_price = fixture.chainspec.vacancy_config.max_gas_price; - let actual_gas_price = fixture.get_current_era_price(); - assert_eq!(actual_gas_price, expected_gas_price); + assert_eq!( + next_gas_price, max_gas_price, + "calculated gas price should match the max gas price" + ); + assert_eq!( + current_era, + EraId::new(max_gas_price as u64), + "we cranked a number of eras to walk up to the max price" + ); + let gas_price_for_non_validating_node = fixture.get_block_gas_price_by_public_key(Some(&non_validating_public_key)); - assert_eq!(actual_gas_price, gas_price_for_non_validating_node); + assert_eq!(max_gas_price, gas_price_for_non_validating_node); let rng = fixture.rng_mut(); let target_public_key = PublicKey::random(rng); @@ -193,3 +238,88 @@ async fn should_raise_gas_price_to_ceiling_and_reduce_to_floor_based_on_size_con let scenario = GasPriceScenario::SizeUtilization(size_limit); run_gas_price_scenario(scenario).await } + +#[tokio::test] +async fn gas_price_calc_should_not_stall_network() { + let initial_stakes = InitialStakes::AllEqual { + count: 5, + stake: 1_000_000_000, + }; + + let min_gas_price: u8 = 1; + let max_gas_price: u8 = 3; + let minimum_era_height = 5; + + let mut transaction_config = TransactionV1Config::default(); + transaction_config + .native_mint_lane + .set_max_transaction_count(1); + + let spec_override = ConfigsOverride::default() + .with_transaction_v1_config(transaction_config) + .with_lower_threshold(5u64) + .with_upper_threshold(10u64) + .with_minimum_era_height(minimum_era_height) + .with_idle_tolerance(TimeDiff::from_seconds(1)) + .with_min_gas_price(min_gas_price) + .with_max_gas_price(max_gas_price); + + let mut fixture = TestFixture::new(initial_stakes, Some(spec_override)).await; + + // Run through the first era. + fixture + .run_until_stored_switch_block_header(ERA_ONE, ONE_MIN) + .await; + + let chain_name = fixture.chainspec.network_config.name.clone(); + let secret_key = Arc::clone(&fixture.node_contexts[0].secret_key); + + let rng = fixture.rng_mut(); + let target_public_key = PublicKey::random(rng); + let node_secret_key = Arc::clone(&fixture.node_contexts[0].secret_key); + let node_public_key = PublicKey::from(&*node_secret_key); + + let fixed_native_mint_transaction = TransactionV1Builder::new_transfer( + 10_000_000_000u64, + None, + target_public_key.clone(), + None, + ) + .expect("must get builder") + .with_chain_name(chain_name.clone()) + .with_secret_key(&secret_key) + .with_ttl(TimeDiff::from_seconds(120 * 10)) + .with_pricing_mode(PricingMode::Fixed { + gas_price_tolerance: max_gas_price, + additional_computation_factor: 0, + }) + .build() + .expect("must get transaction"); + + let txn = Transaction::V1(fixed_native_mint_transaction); + let txn_hash = txn.hash(); + fixture.inject_transaction(txn).await; + + fixture + .run_until_executed_transaction(&txn_hash, TEN_SECS) + .await; + + let block_hash = *fixture.highest_complete_block().hash(); + + fixture.delete_block_utilization_score_by_block_hash_in_node(&node_public_key, block_hash); + + fixture + .run_until_stored_switch_block_header(ERA_TWO, ONE_MIN) + .await; + + let gas_price = fixture + .switch_block(ERA_TWO) + .header() + .era_end() + .unwrap() + .next_era_gas_price(); + + fixture + .check_gas_price_for_nodes(gas_price, THIRTY_SECS) + .await; +} diff --git a/node/src/reactor/main_reactor/tests/network_general.rs b/node/src/reactor/main_reactor/tests/network_general.rs index 55f3100d4c..5d3e2fc5a8 100644 --- a/node/src/reactor/main_reactor/tests/network_general.rs +++ b/node/src/reactor/main_reactor/tests/network_general.rs @@ -316,6 +316,7 @@ async fn should_start_in_isolation() { let spec_override = ConfigsOverride { node_config_override: NodeConfigOverride { sync_handling_override: Some(SyncHandling::Isolated), + idle_tolerance: None, }, ..Default::default() }; @@ -367,6 +368,7 @@ async fn should_be_peerless_in_isolation() { let spec_override = ConfigsOverride { node_config_override: NodeConfigOverride { sync_handling_override: Some(SyncHandling::Isolated), + idle_tolerance: None, }, ..Default::default() }; diff --git a/node/src/reactor/main_reactor/validate.rs b/node/src/reactor/main_reactor/validate.rs index 354c924aa4..8972956d6f 100644 --- a/node/src/reactor/main_reactor/validate.rs +++ b/node/src/reactor/main_reactor/validate.rs @@ -1,3 +1,4 @@ +use casper_types::Timestamp; use std::time::Duration; use tracing::{debug, info, warn}; @@ -34,13 +35,34 @@ impl MainReactor { effect_builder: EffectBuilder, rng: &mut NodeRng, ) -> ValidateInstruction { + if self.force_catchup { + self.force_catchup = false; + return ValidateInstruction::CatchUp; + } let last_progress = self.consensus.last_progress(); if last_progress > self.last_progress { self.last_progress = last_progress; } + let execution_pre_state = self.contract_runtime.execution_pre_state(); + let next_consensus_height = self.consensus.next_executed_height(); + if next_consensus_height != 0 + && next_consensus_height != execution_pre_state.next_block_height() + { + warn!( + "Validate: misalignment of expected block height between consensus and contract runtime" + ); + return ValidateInstruction::CatchUp; + } + let queue_depth = self.contract_runtime.queue_depth(); if queue_depth > 0 { + let idleness = Timestamp::now().saturating_diff(last_progress); + if idleness > self.idle_tolerance { + warn!("Validate: idleness tolerance reached with backed up queue, switching to catch up"); + return ValidateInstruction::CatchUp; + } + warn!("Validate: should_validate queue_depth {}", queue_depth); return ValidateInstruction::CheckLater( "allow time for contract runtime execution to occur".to_string(), diff --git a/node/src/types/block/block_execution_results_or_chunk.rs b/node/src/types/block/block_execution_results_or_chunk.rs index ea31b4b922..0cbbe4567e 100644 --- a/node/src/types/block/block_execution_results_or_chunk.rs +++ b/node/src/types/block/block_execution_results_or_chunk.rs @@ -166,6 +166,15 @@ impl BlockExecutionResultsOrChunk { &self.block_hash } + #[cfg(test)] + pub(crate) fn new_empty_value(block_hash: BlockHash) -> Self { + Self { + block_hash, + value: ValueOrChunk::new(vec![], 0).unwrap(), + is_valid: OnceCell::with_value(Ok(true)), + } + } + #[cfg(test)] pub(crate) fn new_mock_value(rng: &mut TestRng, block_hash: BlockHash) -> Self { Self::new_mock_value_with_multiple_random_results(rng, block_hash, 1) diff --git a/node/src/types/block/executable_block.rs b/node/src/types/block/executable_block.rs index a7f438329c..502a637108 100644 --- a/node/src/types/block/executable_block.rs +++ b/node/src/types/block/executable_block.rs @@ -1,14 +1,17 @@ -use std::{collections::BTreeMap, fmt}; +use super::{FinalizedBlock, InternalEraReport}; +use casper_types::{ + BlockV2, Chainspec, EraId, PublicKey, RewardedSignatures, Timestamp, Transaction, + TransactionHash, AUCTION_LANE_ID, INSTALL_UPGRADE_LANE_ID, MINT_LANE_ID, U512, +}; use datasize::DataSize; +use num_rational::Ratio; use serde::Serialize; - -use casper_types::{ - BlockV2, EraId, PublicKey, RewardedSignatures, Timestamp, Transaction, TransactionHash, - AUCTION_LANE_ID, INSTALL_UPGRADE_LANE_ID, MINT_LANE_ID, U512, +use std::{ + collections::{BTreeMap, HashMap}, + fmt, }; - -use super::{FinalizedBlock, InternalEraReport}; +use tracing::warn; /// Data necessary for a block to be executed. #[derive(DataSize, Debug, Clone, PartialEq, Serialize)] @@ -97,6 +100,74 @@ impl ExecutableBlock { current_gas_price: block.header().current_gas_price(), } } + + pub(crate) fn calc_utilization_score(&self, chainspec: &Chainspec) -> Option { + let cfg = &chainspec.transaction_config.transaction_v1_config; + let per_block_capacity = cfg.get_max_block_count(); + let max_block_size = chainspec.transaction_config.max_block_size as u64; + let block_gas_limit = chainspec.transaction_config.block_gas_limit; + + let mut has_hit_slot_limit = false; + let mut transaction_hash_to_lane_id = HashMap::new(); + + for (lane_id, transactions) in self.transaction_map.iter() { + transaction_hash_to_lane_id.extend( + transactions + .iter() + .map(|transaction| (transaction, *lane_id)), + ); + let max_count = cfg.get_max_transaction_count(*lane_id); + if max_count == transactions.len() as u64 { + has_hit_slot_limit = true; + } + } + + if has_hit_slot_limit { + Some(100u64) + } else if self.transactions.is_empty() { + Some(0u64) + } else { + let size_utilization: u64 = { + let total_size_of_transactions: u64 = self + .transactions + .iter() + .map(|transaction| transaction.size_estimate() as u64) + .sum(); + + Ratio::new(total_size_of_transactions * 100, max_block_size).to_integer() + }; + let gas_utilization: u64 = { + let total_gas_limit: u64 = self + .transactions + .iter() + .map( + |transaction| match transaction_hash_to_lane_id.get(&transaction.hash()) { + Some(lane_id) => match &transaction.gas_limit(chainspec, *lane_id) { + Ok(gas_limit) => gas_limit.value().as_u64(), + Err(_) => { + warn!("Unable to determine gas limit"); + 0u64 + } + }, + None => { + warn!("Unable to determine gas limit"); + 0u64 + } + }, + ) + .sum(); + + Ratio::new(total_gas_limit * 100, block_gas_limit).to_integer() + }; + + let slot_utilization = + Ratio::new(self.transactions.len() as u64 * 100, per_block_capacity).to_integer(); + + let utilization_scores = [slot_utilization, gas_utilization, size_utilization]; + + utilization_scores.iter().max().copied() + } + } } impl fmt::Display for ExecutableBlock { diff --git a/storage/src/block_store/lmdb/indexed_lmdb_block_store.rs b/storage/src/block_store/lmdb/indexed_lmdb_block_store.rs index 2e8a759c9e..2db3030e60 100644 --- a/storage/src/block_store/lmdb/indexed_lmdb_block_store.rs +++ b/storage/src/block_store/lmdb/indexed_lmdb_block_store.rs @@ -542,6 +542,17 @@ impl IndexedLmdbBlockStoreReadTransaction<'_> { .block_signatures_exist(&self.txn, block_hash), }) } + + pub fn get_switch_block_height(&self, era_id: EraId) -> Result, BlockStoreError> { + let index = LmdbBlockStoreIndex::SwitchBlockEraId(IndexPosition::Key(era_id)); + match self.block_hash_from_index(index) { + Some(block_hash) => { + let maybe_header: Option = self.read(*block_hash)?; + Ok(maybe_header.map(|header| header.height())) + } + None => Ok(None), + } + } } impl BlockStoreTransaction for IndexedLmdbBlockStoreReadTransaction<'_> { diff --git a/types/src/block.rs b/types/src/block.rs index 833d7a9ac0..9daa3ba1f4 100644 --- a/types/src/block.rs +++ b/types/src/block.rs @@ -27,7 +27,7 @@ use itertools::Either; #[cfg(feature = "json-schema")] use once_cell::sync::Lazy; #[cfg(feature = "std")] -use std::error::Error as StdError; +use std::{borrow::Borrow, error::Error as StdError}; #[cfg(feature = "datasize")] use datasize::DataSize; @@ -418,18 +418,21 @@ impl Block { /// Returns the utilization of the block against a given chainspec. #[cfg(feature = "std")] - pub fn block_utilization(&self, transaction_config: TransactionConfig) -> u64 { + pub fn block_utilization( + &self, + transaction_config_input: impl Borrow, + ) -> u64 { + let transaction_config = transaction_config_input.borrow(); match self { Block::V1(_) => { // We shouldnt be tracking this for legacy blocks 0 } Block::V2(block_v2) => { - let has_hit_slot_limt = self.has_hit_slot_capacity(transaction_config.clone()); let per_block_capacity = transaction_config .transaction_v1_config .get_max_block_count(); - + let has_hit_slot_limt = self.has_hit_slot_capacity(transaction_config); if has_hit_slot_limt { 100u64 } else { @@ -442,7 +445,11 @@ impl Block { /// Returns true if the block has reached capacity in any of its transaction limit. #[cfg(feature = "std")] - pub fn has_hit_slot_capacity(&self, transaction_config: TransactionConfig) -> bool { + pub fn has_hit_slot_capacity( + &self, + transaction_config_input: impl Borrow, + ) -> bool { + let transaction_config = transaction_config_input.borrow(); match self { Block::V1(_) => false, Block::V2(block_v2) => {