diff --git a/packages/pocket-ic/src/common/rest.rs b/packages/pocket-ic/src/common/rest.rs index e04e4878b60a..042ee0834ce9 100644 --- a/packages/pocket-ic/src/common/rest.rs +++ b/packages/pocket-ic/src/common/rest.rs @@ -656,6 +656,7 @@ pub struct InstanceConfig { pub incomplete_state: Option, pub initial_time: Option, pub mainnet_nns_subnet_id: Option, + pub flexible_ordering: Option, } #[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize, Default, JsonSchema)] @@ -1238,3 +1239,59 @@ pub struct RawCanisterSnapshotId { #[serde(serialize_with = "base64::serialize")] pub snapshot_id: Vec, } + +// ================================================================================================================= // +// Flexible message ordering types +// ================================================================================================================= // + +#[derive(Clone, Serialize, Deserialize, Debug, JsonSchema)] +pub enum RawOrderedMessage { + Ingress { + canister_id: RawCanisterId, + #[serde(deserialize_with = "base64::deserialize")] + #[serde(serialize_with = "base64::serialize")] + message_id: Vec, + }, + Request { + source: RawCanisterId, + target: RawCanisterId, + }, + Response { + source: RawCanisterId, + target: RawCanisterId, + }, + Heartbeat { + canister_id: RawCanisterId, + }, + Timer { + canister_id: RawCanisterId, + }, +} + +#[derive(Clone, Serialize, Deserialize, Debug, JsonSchema)] +pub struct RawMessageOrdering { + pub subnet_id: RawSubnetId, + pub messages: Vec, +} + +#[derive(Clone, Serialize, Deserialize, Debug, JsonSchema)] +pub struct RawBufferIngressMessage { + pub subnet_id: RawSubnetId, + #[serde(deserialize_with = "base64::deserialize")] + #[serde(serialize_with = "base64::serialize")] + pub sender: Vec, + #[serde(deserialize_with = "base64::deserialize")] + #[serde(serialize_with = "base64::serialize")] + pub canister_id: Vec, + pub method: String, + #[serde(deserialize_with = "base64::deserialize")] + #[serde(serialize_with = "base64::serialize")] + pub payload: Vec, +} + +#[derive(Clone, Serialize, Deserialize, Debug, JsonSchema)] +pub struct RawBufferedIngressId { + #[serde(deserialize_with = "base64::deserialize")] + #[serde(serialize_with = "base64::serialize")] + pub message_id: Vec, +} diff --git a/packages/pocket-ic/src/lib.rs b/packages/pocket-ic/src/lib.rs index 308dfcf4e0c6..3bcdf8eacaa5 100644 --- a/packages/pocket-ic/src/lib.rs +++ b/packages/pocket-ic/src/lib.rs @@ -175,6 +175,7 @@ pub struct PocketIcBuilder { icp_features: IcpFeatures, initial_time: Option, mainnet_nns_subnet_id: Option, + flexible_ordering: bool, } #[allow(clippy::new_without_default)] @@ -195,6 +196,7 @@ impl PocketIcBuilder { icp_features: IcpFeatures::default(), initial_time: None, mainnet_nns_subnet_id: None, + flexible_ordering: false, } } @@ -220,6 +222,7 @@ impl PocketIcBuilder { self.initial_time, self.http_gateway_config, self.mainnet_nns_subnet_id, + self.flexible_ordering, ) } @@ -239,6 +242,7 @@ impl PocketIcBuilder { self.initial_time, self.http_gateway_config, self.mainnet_nns_subnet_id, + self.flexible_ordering, ) .await } @@ -495,6 +499,12 @@ impl PocketIcBuilder { self.mainnet_nns_subnet_id = Some(true); self } + + /// Enable flexible message ordering on all subnets. + pub fn with_flexible_ordering(mut self) -> Self { + self.flexible_ordering = true; + self + } } /// Representation of system time as duration since UNIX epoch @@ -615,6 +625,7 @@ impl PocketIc { initial_time: Option, http_gateway_config: Option, mainnet_nns_subnet_id: Option, + flexible_ordering: bool, ) -> Self { let (tx, rx) = channel(); let thread = thread::spawn(move || { @@ -642,6 +653,7 @@ impl PocketIc { initial_time, http_gateway_config, mainnet_nns_subnet_id, + flexible_ordering, ) .await }); @@ -938,6 +950,37 @@ impl PocketIc { }) } + /// Buffer an ingress message on a specific subnet for later ordered execution. + pub fn buffer_ingress( + &self, + subnet_id: Principal, + sender: Principal, + canister_id: CanisterId, + method: &str, + payload: Vec, + ) -> Vec { + let runtime = self.runtime.clone(); + runtime.block_on(async { + self.pocket_ic + .buffer_ingress(subnet_id, sender, canister_id, method, payload) + .await + }) + } + + /// Execute messages in a specified order on a given subnet. + pub fn execute_with_ordering( + &self, + subnet_id: Principal, + messages: Vec, + ) { + let runtime = self.runtime.clone(); + runtime.block_on(async { + self.pocket_ic + .execute_with_ordering(subnet_id, messages) + .await + }) + } + /// Await an update call submitted previously by `submit_call` or `submit_call_with_effective_principal`. pub fn await_call(&self, message_id: RawMessageId) -> Result, RejectResponse> { let runtime = self.runtime.clone(); diff --git a/packages/pocket-ic/src/nonblocking.rs b/packages/pocket-ic/src/nonblocking.rs index b6465f7aa76f..6982ef7f35a1 100644 --- a/packages/pocket-ic/src/nonblocking.rs +++ b/packages/pocket-ic/src/nonblocking.rs @@ -4,9 +4,10 @@ use crate::common::rest::{ CreateHttpGatewayResponse, CreateInstanceResponse, ExtendedSubnetConfigSet, HttpGatewayBackend, HttpGatewayConfig, HttpGatewayInfo, HttpsConfig, IcpConfig, IcpFeatures, InitialTime, InstanceConfig, InstanceHttpGatewayConfig, InstanceId, MockCanisterHttpResponse, RawAddCycles, - RawCanisterCall, RawCanisterHttpRequest, RawCanisterId, RawCanisterResult, - RawCanisterSnapshotDownload, RawCanisterSnapshotId, RawCanisterSnapshotUpload, RawCycles, - RawEffectivePrincipal, RawIngressStatusArgs, RawMessageId, RawMockCanisterHttpResponse, + RawBufferIngressMessage, RawBufferedIngressId, RawCanisterCall, RawCanisterHttpRequest, + RawCanisterId, RawCanisterResult, RawCanisterSnapshotDownload, RawCanisterSnapshotId, + RawCanisterSnapshotUpload, RawCycles, RawEffectivePrincipal, RawIngressStatusArgs, + RawMessageId, RawMessageOrdering, RawMockCanisterHttpResponse, RawOrderedMessage, RawPrincipalId, RawSetStableMemory, RawStableMemory, RawSubnetId, RawTickConfigs, RawTime, RawVerifyCanisterSigArg, SubnetId, Topology, }; @@ -148,6 +149,7 @@ impl PocketIc { initial_time: Option, http_gateway_config: Option, mainnet_nns_subnet_id: Option, + flexible_ordering: bool, ) -> Self { let server_url = if let Some(server_url) = server_url { server_url @@ -204,6 +206,7 @@ impl PocketIc { incomplete_state: None, initial_time, mainnet_nns_subnet_id, + flexible_ordering: if flexible_ordering { Some(true) } else { None }, }; let test_driver_pid = std::process::id(); @@ -689,6 +692,43 @@ impl PocketIc { self.post(endpoint, raw_canister_call).await } + /// Buffer an ingress message on a specific subnet for later ordered execution. + pub async fn buffer_ingress( + &self, + subnet_id: Principal, + sender: Principal, + canister_id: CanisterId, + method: &str, + payload: Vec, + ) -> Vec { + let raw = RawBufferIngressMessage { + subnet_id: RawSubnetId { + subnet_id: subnet_id.as_slice().to_vec(), + }, + sender: sender.as_slice().to_vec(), + canister_id: canister_id.as_slice().to_vec(), + method: method.to_string(), + payload, + }; + let result: RawBufferedIngressId = self.post("update/buffer_ingress_message", raw).await; + result.message_id + } + + /// Execute messages in a specified order on a given subnet. + pub async fn execute_with_ordering( + &self, + subnet_id: Principal, + messages: Vec, + ) { + let raw = RawMessageOrdering { + subnet_id: RawSubnetId { + subnet_id: subnet_id.as_slice().to_vec(), + }, + messages, + }; + let _: () = self.post("update/execute_with_ordering", raw).await; + } + /// Await an update call submitted previously by `submit_call` or `submit_call_with_effective_principal`. pub async fn await_call(&self, message_id: RawMessageId) -> Result, RejectResponse> { let endpoint = "update/await_ingress_message"; diff --git a/packages/pocket-ic/tests/tests.rs b/packages/pocket-ic/tests/tests.rs index 54dbd5667e3c..3cfdce1bbced 100644 --- a/packages/pocket-ic/tests/tests.rs +++ b/packages/pocket-ic/tests/tests.rs @@ -3435,3 +3435,66 @@ fn cloud_engine_default_effective_canister_id() { topology.default_effective_canister_id.clone().into(); assert_eq!(effective_canister_id, default_effective_canister_id); } + +#[test] +fn test_flexible_ordering_basic() { + use pocket_ic::common::rest::{RawCanisterId, RawOrderedMessage}; + + let pic = PocketIcBuilder::new() + .with_application_subnet() + .with_flexible_ordering() + .build(); + + let topology = pic.topology(); + let subnet_id = topology.get_app_subnets()[0]; + + let canister_id = pic.create_canister_on_subnet(None, None, subnet_id); + pic.add_cycles(canister_id, INIT_CYCLES); + pic.install_canister(canister_id, counter_wasm(), vec![], None); + + // Buffer two write calls. + let msg_id_1 = pic.buffer_ingress( + subnet_id, + Principal::anonymous(), + canister_id, + "write", + encode_one(()).unwrap(), + ); + let msg_id_2 = pic.buffer_ingress( + subnet_id, + Principal::anonymous(), + canister_id, + "write", + encode_one(()).unwrap(), + ); + + // Execute them in order via flexible ordering. + pic.execute_with_ordering( + subnet_id, + vec![ + RawOrderedMessage::Ingress { + canister_id: RawCanisterId { + canister_id: canister_id.as_slice().to_vec(), + }, + message_id: msg_id_1, + }, + RawOrderedMessage::Ingress { + canister_id: RawCanisterId { + canister_id: canister_id.as_slice().to_vec(), + }, + message_id: msg_id_2, + }, + ], + ); + + // Counter should be 2 (two writes executed). + let reply = pic + .query_call( + canister_id, + Principal::anonymous(), + "read", + encode_one(()).unwrap(), + ) + .unwrap(); + assert_eq!(reply, vec![2, 0, 0, 0]); +} diff --git a/rs/config/src/subnet_config.rs b/rs/config/src/subnet_config.rs index b5b424292dab..0df2961f9ce5 100644 --- a/rs/config/src/subnet_config.rs +++ b/rs/config/src/subnet_config.rs @@ -64,6 +64,16 @@ const INSTRUCTION_OVERHEAD_PER_CANISTER_FOR_FINALIZATION: NumInstructions = // would cause the longest possible round of 4B instructions or 2 seconds. const MAX_INSTRUCTIONS_PER_ROUND: NumInstructions = NumInstructions::new(4 * B); +/// Ideally we would split the per-round limit between subnet messages and +/// canister messages, so that their sum cannot exceed the limit. That would +/// make the limit for canister messages variable, which would break assumptions +/// of the scheduling algorithm. The next best thing we can do is to limit +/// subnet messages on top of the fixed limit for canister messages. +/// The value of the limit for subnet messages is chosen quite arbitrarily +/// as 1/16 of the fixed limit. Any other value in the same ballpark would +/// work here. +pub const SUBNET_MESSAGES_LIMIT_FRACTION: u64 = 16; + // Limit per `install_code` message. It's bigger than the limit for a regular // update call to allow for canisters with bigger state to be upgraded. const MAX_INSTRUCTIONS_PER_INSTALL_CODE: NumInstructions = NumInstructions::new(300 * B); @@ -274,6 +284,10 @@ pub struct SchedulerConfig { /// Number of instructions to count when uploading or downloading binary snapshot data. pub canister_snapshot_data_baseline_instructions: NumInstructions, + + /// Per-round subnet message instruction budget. + /// Defaults to `max_instructions_per_round / SUBNET_MESSAGES_LIMIT_FRACTION`. + pub subnet_messages_per_round_instruction_limit: NumInstructions, } impl SchedulerConfig { @@ -305,6 +319,8 @@ impl SchedulerConfig { DEFAULT_CANISTERS_SNAPSHOT_BASELINE_INSTRUCTIONS, canister_snapshot_data_baseline_instructions: DEFAULT_CANISTERS_SNAPSHOT_DATA_BASELINE_INSTRUCTIONS, + subnet_messages_per_round_instruction_limit: MAX_INSTRUCTIONS_PER_ROUND + / SUBNET_MESSAGES_LIMIT_FRACTION, } } @@ -314,6 +330,11 @@ impl SchedulerConfig { let max_instructions_per_install_code = NumInstructions::from(1_000 * B); let max_instructions_per_slice = NumInstructions::from(2 * B); let max_instructions_per_install_code_slice = NumInstructions::from(5 * B); + // Round limit is set to allow on average 2B instructions. + // See also comment about `MAX_INSTRUCTIONS_PER_ROUND`. + let max_instructions_per_round = max_instructions_per_slice + .max(max_instructions_per_install_code_slice) + + NumInstructions::from(2 * B); Self { scheduler_cores: NUMBER_OF_EXECUTION_THREADS, max_paused_executions: MAX_PAUSED_EXECUTIONS, @@ -321,11 +342,7 @@ impl SchedulerConfig { // TODO(RUN-993): Enable heap delta rate limiting for system subnets. // Setting initial reserve to capacity effectively disables the rate limiting. heap_delta_initial_reserve: SUBNET_HEAP_DELTA_CAPACITY, - // Round limit is set to allow on average 2B instructions. - // See also comment about `MAX_INSTRUCTIONS_PER_ROUND`. - max_instructions_per_round: max_instructions_per_slice - .max(max_instructions_per_install_code_slice) - + NumInstructions::from(2 * B), + max_instructions_per_round, max_instructions_per_message, max_instructions_per_query_message, max_instructions_per_slice, @@ -349,6 +366,8 @@ impl SchedulerConfig { upload_wasm_chunk_instructions: NumInstructions::from(0), canister_snapshot_baseline_instructions: NumInstructions::from(0), canister_snapshot_data_baseline_instructions: NumInstructions::from(0), + subnet_messages_per_round_instruction_limit: max_instructions_per_round + / SUBNET_MESSAGES_LIMIT_FRACTION, } } diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index b15b9f490cf2..b5be01695c94 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -64,16 +64,6 @@ pub(crate) mod test_utilities; #[cfg(test)] pub(crate) mod tests; -/// Ideally we would split the per-round limit between subnet messages and -/// canister messages, so that their sum cannot exceed the limit. That would -/// make the limit for canister messages variable, which would break assumptions -/// of the scheduling algorithm. The next best thing we can do is to limit -/// subnet messages on top of the fixed limit for canister messages. -/// The value of the limit for subnet messages is chosen quite arbitrarily -/// as 1/16 of the fixed limit. Any other value in the same ballpark would -/// work here. -const SUBNET_MESSAGES_LIMIT_FRACTION: u64 = 16; - /// Contains limits (or budget) for various resources that affect duration of /// an execution round. #[derive(Clone, Debug, Default)] @@ -1322,7 +1312,7 @@ impl Scheduler for SchedulerImpl { SchedulerRoundLimits { instructions: round_instructions, subnet_instructions: as_round_instructions( - self.config.max_instructions_per_round / SUBNET_MESSAGES_LIMIT_FRACTION, + self.config.subnet_messages_per_round_instruction_limit, ), subnet_available_memory: self.exec_env.scaled_subnet_available_memory(&state), subnet_available_callbacks: self.exec_env.subnet_available_callbacks(&state), diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index 1dca5f41ad83..67e70ef771ab 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -10,7 +10,6 @@ use ic_replicated_state::canister_state::NextExecution; use ic_replicated_state::{CanisterPriority, CanisterState, ReplicatedState}; use ic_types::{AccumulatedPriority, ComputeAllocation, ExecutionRound, LongExecutionMode}; use ic_utils::iter::left_outer_join; -use more_asserts::debug_assert_gt; use num_traits::SaturatingSub; use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; @@ -325,7 +324,9 @@ impl RoundSchedule { } let last_prioritized_long = idx; let new_execution_cores = self.scheduler_cores - last_prioritized_long; - debug_assert_gt!(new_execution_cores, 0); + // With scheduler_cores == 1, new_execution_cores can be 0 when there + // is a prioritized long execution occupying the single core. + debug_assert!(self.scheduler_cores <= 1 || new_execution_cores > 0); for canister_id in scheduling_order.new_canisters { let canister_state = canisters.remove(canister_id).unwrap(); canisters_partitioned_by_cores[idx].push(canister_state); @@ -500,16 +501,20 @@ impl RoundSchedule { } } // Assert there is at least `1%` of free capacity to distribute across canisters. - // It's guaranteed by `validate_compute_allocation()` - debug_assert_or_critical_error!( - total_compute_allocation + ONE_PERCENT <= compute_capacity, - metrics.scheduler_compute_allocation_invariant_broken, - logger, - "{}: Total compute allocation {}% must be less than compute capacity {}%", - SCHEDULER_COMPUTE_ALLOCATION_INVARIANT_BROKEN, - total_compute_allocation, - compute_capacity - ); + // It's guaranteed by `validate_compute_allocation()`. + // Skip the assertion when compute_capacity is zero (scheduler_cores == 1), + // which can happen in testing configurations like flexible message ordering. + if compute_capacity > ZERO { + debug_assert_or_critical_error!( + total_compute_allocation + ONE_PERCENT <= compute_capacity, + metrics.scheduler_compute_allocation_invariant_broken, + logger, + "{}: Total compute allocation {}% must be less than compute capacity {}%", + SCHEDULER_COMPUTE_ALLOCATION_INVARIANT_BROKEN, + total_compute_allocation, + compute_capacity + ); + } // Observe accumulated priority metrics metrics .scheduler_accumulated_priority_invariant @@ -542,14 +547,17 @@ impl RoundSchedule { - AccumulatedPriority::new(1)) / ONE_HUNDRED_PERCENT) as usize; // If there are long executions, the `long_execution_cores` must be non-zero. - debug_assert_or_critical_error!( - number_of_long_executions == 0 || long_execution_cores > 0, - metrics.scheduler_cores_invariant_broken, - logger, - "{}: Number of long execution cores {} must be more than 0", - SCHEDULER_CORES_INVARIANT_BROKEN, - long_execution_cores, - ); + // Skip assertion when compute_capacity is zero (scheduler_cores == 1). + if compute_capacity > ZERO { + debug_assert_or_critical_error!( + number_of_long_executions == 0 || long_execution_cores > 0, + metrics.scheduler_cores_invariant_broken, + logger, + "{}: Number of long execution cores {} must be more than 0", + SCHEDULER_CORES_INVARIANT_BROKEN, + long_execution_cores, + ); + } // As one scheduler core is reserved, the `long_execution_cores` is always // less than `scheduler_cores` debug_assert_or_critical_error!( diff --git a/rs/execution_environment/src/scheduler/test_utilities.rs b/rs/execution_environment/src/scheduler/test_utilities.rs index c6e228c18c12..8b0eeb604872 100644 --- a/rs/execution_environment/src/scheduler/test_utilities.rs +++ b/rs/execution_environment/src/scheduler/test_utilities.rs @@ -72,8 +72,9 @@ use std::{collections::BTreeSet, time::Duration}; use crate::{ExecutionServicesForTesting, RoundLimits, as_round_instructions}; -use super::{SUBNET_MESSAGES_LIMIT_FRACTION, SchedulerImpl}; +use super::SchedulerImpl; use crate::metrics::MeasurementScope; +use ic_config::subnet_config::SUBNET_MESSAGES_LIMIT_FRACTION; use ic_crypto_prng::{Csprng, RandomnessPurpose::ExecutionThread}; use ic_types::time::UNIX_EPOCH; diff --git a/rs/execution_environment/src/scheduler/tests/limits.rs b/rs/execution_environment/src/scheduler/tests/limits.rs index 4a5d11207ba9..2e568dcfe8e5 100644 --- a/rs/execution_environment/src/scheduler/tests/limits.rs +++ b/rs/execution_environment/src/scheduler/tests/limits.rs @@ -6,7 +6,7 @@ use super::super::test_utilities::{ use super::super::*; use super::{zero_instruction_messages, zero_instruction_overhead_config}; use crate::scheduler::test_utilities::EMPTY_WASM; -use ic_config::subnet_config::SchedulerConfig; +use ic_config::subnet_config::{SUBNET_MESSAGES_LIMIT_FRACTION, SchedulerConfig}; use ic_replicated_state::testing::CanisterQueuesTesting; use ic_replicated_state::{NumWasmPages, num_bytes_try_from}; use proptest::prelude::*; @@ -522,14 +522,17 @@ fn subnet_messages_respect_instruction_limit_per_round() { // - 3 subnet messages should run (using 30 out of 400 instructions). // - 10 input messages should run (using 100 out of 400 instructions). + let max_instructions_per_round = NumInstructions::new(400); let mut test = SchedulerTestBuilder::new() .with_scheduler_config(SchedulerConfig { scheduler_cores: 2, - max_instructions_per_round: NumInstructions::new(400), + max_instructions_per_round, max_instructions_per_message: NumInstructions::new(10), max_instructions_per_slice: NumInstructions::new(10), max_instructions_per_install_code: NumInstructions::new(10), max_instructions_per_install_code_slice: NumInstructions::new(10), + subnet_messages_per_round_instruction_limit: max_instructions_per_round + / SUBNET_MESSAGES_LIMIT_FRACTION, ..zero_instruction_overhead_config() }) .build(); diff --git a/rs/execution_environment/src/scheduler/tests/subnet_messages.rs b/rs/execution_environment/src/scheduler/tests/subnet_messages.rs index 53e3e63881ac..da4e15ef246a 100644 --- a/rs/execution_environment/src/scheduler/tests/subnet_messages.rs +++ b/rs/execution_environment/src/scheduler/tests/subnet_messages.rs @@ -6,7 +6,7 @@ use super::super::test_utilities::{ use super::super::*; use super::zero_instruction_overhead_config; use candid::Encode; -use ic_config::subnet_config::SchedulerConfig; +use ic_config::subnet_config::{SUBNET_MESSAGES_LIMIT_FRACTION, SchedulerConfig}; use ic_management_canister_types_private::{ CanisterIdRecord, EmptyBlob, FetchCanisterLogsRequest, Method, Payload as _, }; diff --git a/rs/pocket_ic_server/src/pocket_ic.rs b/rs/pocket_ic_server/src/pocket_ic.rs index bc3c044aba54..a42f2ceae8db 100644 --- a/rs/pocket_ic_server/src/pocket_ic.rs +++ b/rs/pocket_ic_server/src/pocket_ic.rs @@ -105,9 +105,9 @@ use ic_sns_wasm::init::SnsWasmCanisterInitPayloadBuilder; use ic_sns_wasm::pb::v1::add_wasm_response::Result as AddWasmResult; use ic_sns_wasm::pb::v1::{AddWasmRequest, AddWasmResponse, SnsCanisterType, SnsWasm}; use ic_state_machine_tests::{ - FakeVerifier, StateMachine, StateMachineBuilder, StateMachineConfig, StateMachineStateDir, - SubmitIngressError, Subnets, WasmResult, add_global_registry_records, - add_initial_registry_records, + FakeVerifier, MessageOrdering, OrderedMessage, StateMachine, StateMachineBuilder, + StateMachineConfig, StateMachineStateDir, SubmitIngressError, Subnets, WasmResult, + add_global_registry_records, add_initial_registry_records, }; use ic_state_manager::StateManagerImpl; use ic_types::batch::BlockmakerMetrics; @@ -139,8 +139,9 @@ use pocket_ic::common::rest::{ self, BinaryBlob, BlobCompression, CanisterHttpHeader, CanisterHttpMethod, CanisterHttpRequest, CanisterHttpResponse, ExtendedSubnetConfigSet, IcpConfig, IcpConfigFlag, IcpFeatures, IcpFeaturesConfig, IncompleteStateFlag, MockCanisterHttpResponse, RawAddCycles, - RawCanisterCall, RawCanisterId, RawEffectivePrincipal, RawMessageId, RawSetStableMemory, - SubnetInstructionConfig, SubnetKind, Topology, + RawBufferIngressMessage, RawCanisterCall, RawCanisterId, RawEffectivePrincipal, RawMessageId, + RawMessageOrdering, RawOrderedMessage, RawSetStableMemory, SubnetInstructionConfig, SubnetKind, + Topology, }; use pocket_ic::{ErrorCode, RejectCode, RejectResponse, copy_dir}; use registry_canister::init::RegistryCanisterInitPayloadBuilder; @@ -622,6 +623,7 @@ struct PocketIcSubnets { synced_registry_version: RegistryVersion, _bitcoin_adapter_parts: Option, _dogecoin_adapter_parts: Option, + flexible_ordering: bool, } impl PocketIcSubnets { @@ -640,6 +642,7 @@ impl PocketIcSubnets { log_level: Option, bitcoin_adapter_uds_path: Option, dogecoin_adapter_uds_path: Option, + flexible_ordering: bool, ) -> StateMachineBuilder { let subnet_type = conv_type(subnet_kind); let subnet_size = subnet_size(subnet_kind); @@ -720,9 +723,13 @@ impl PocketIcSubnets { if let Some(subnet_admins) = subnet_admins { builder = builder.with_subnet_admins(subnet_admins); } + if flexible_ordering { + builder = builder.with_flexible_ordering(); + } builder } + #[allow(clippy::too_many_arguments)] fn new( runtime: Arc, state_dir: PocketIcStateDir, @@ -736,6 +743,7 @@ impl PocketIcSubnets { gateway_port: Option, registry_data_provider: Arc, synced_registry_version: Option, + flexible_ordering: bool, ) -> Self { let routing_table = RoutingTable::new(); let chain_keys = BTreeMap::new(); @@ -766,6 +774,7 @@ impl PocketIcSubnets { synced_registry_version, _bitcoin_adapter_parts: None, _dogecoin_adapter_parts: None, + flexible_ordering, } } @@ -884,6 +893,7 @@ impl PocketIcSubnets { self.log_level, bitcoin_adapter_uds_path.clone(), dogecoin_adapter_uds_path.clone(), + self.flexible_ordering, ); if let Some(subnet_id) = subnet_id { @@ -2804,6 +2814,7 @@ impl PocketIc { auto_progress_enabled: bool, gateway_port: Option, mainnet_nns_subnet_id: bool, + flexible_ordering: bool, ) -> Result { if let Some(time) = initial_time { let systime: SystemTime = time.into(); @@ -3128,6 +3139,7 @@ impl PocketIc { gateway_port, registry_data_provider, synced_registry_version, + flexible_ordering, ); let mut subnet_configs = Vec::new(); for subnet_config_info in subnet_config_info.into_iter() { @@ -4313,6 +4325,83 @@ impl Operation for SubmitIngressMessage { } } +#[derive(Clone, Debug)] +pub struct BufferIngressMessage { + pub subnet_id: SubnetId, + pub sender: PrincipalId, + pub canister_id: CanisterId, + pub method: String, + pub payload: Vec, +} + +impl Operation for BufferIngressMessage { + fn compute(&self, pic: &mut PocketIc) -> OpOut { + let sm = match pic.subnets.get(self.subnet_id) { + Some(sm) => sm, + None => { + return OpOut::Error(PocketIcError::SubnetNotFound(self.subnet_id.get().0)); + } + }; + match sm.buffer_ingress_as( + self.sender, + self.canister_id, + self.method.clone(), + self.payload.clone(), + ) { + Ok(msg_id) => OpOut::Bytes(msg_id.as_bytes().to_vec()), + Err(e) => OpOut::Error(PocketIcError::BadIngressMessage(format!("{e:?}"))), + } + } + + fn id(&self) -> OpId { + OpId(format!( + "buffer_ingress({},{},{})", + self.sender, self.canister_id, self.method + )) + } +} + +#[derive(Clone, Debug)] +pub struct ExecuteWithOrdering { + pub subnet_id: SubnetId, + pub messages: Vec, +} + +impl Operation for ExecuteWithOrdering { + fn compute(&self, pic: &mut PocketIc) -> OpOut { + let sm = match pic.subnets.get(self.subnet_id) { + Some(sm) => sm, + None => { + return OpOut::Error(PocketIcError::SubnetNotFound(self.subnet_id.get().0)); + } + }; + let ordering = MessageOrdering::new(self.messages.clone()); + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + sm.execute_with_ordering(ordering); + })) { + Ok(()) => OpOut::NoOutput, + Err(e) => { + let msg = if let Some(s) = e.downcast_ref::() { + s.clone() + } else if let Some(s) = e.downcast_ref::<&str>() { + s.to_string() + } else { + "Unknown panic during execute_with_ordering".to_string() + }; + OpOut::Error(PocketIcError::InvalidOrdering(msg)) + } + } + } + + fn id(&self) -> OpId { + OpId(format!( + "execute_with_ordering(subnet={},steps={})", + self.subnet_id, + self.messages.len() + )) + } +} + #[derive(Clone, Debug)] pub struct MessageId { effective_principal: EffectivePrincipal, @@ -5311,6 +5400,112 @@ impl TryFrom for AddCycles { } } +impl TryFrom for BufferIngressMessage { + type Error = ConversionError; + fn try_from(raw: RawBufferIngressMessage) -> Result { + let subnet_id = + SubnetId::new(PrincipalId::try_from(raw.subnet_id.subnet_id).map_err(|_| { + ConversionError { + message: "Bad subnet id".to_string(), + } + })?); + let sender = PrincipalId::try_from(raw.sender).map_err(|_| ConversionError { + message: "Bad sender".to_string(), + })?; + let canister_id = CanisterId::try_from(raw.canister_id).map_err(|_| ConversionError { + message: "Bad canister id".to_string(), + })?; + Ok(BufferIngressMessage { + subnet_id, + sender, + canister_id, + method: raw.method, + payload: raw.payload, + }) + } +} + +impl TryFrom for ExecuteWithOrdering { + type Error = ConversionError; + fn try_from(raw: RawMessageOrdering) -> Result { + let subnet_id = + SubnetId::new(PrincipalId::try_from(raw.subnet_id.subnet_id).map_err(|_| { + ConversionError { + message: "Bad subnet id".to_string(), + } + })?); + let messages = raw + .messages + .into_iter() + .map(|m| match m { + RawOrderedMessage::Ingress { + canister_id, + message_id, + } => { + let cid = CanisterId::try_from(canister_id.canister_id).map_err(|_| { + ConversionError { + message: "Bad canister id".to_string(), + } + })?; + let mid = + OtherMessageId::try_from(&message_id[..]).map_err(|_| ConversionError { + message: "Bad message id".to_string(), + })?; + Ok(OrderedMessage::Ingress(cid, mid)) + } + RawOrderedMessage::Request { source, target } => { + let s = + CanisterId::try_from(source.canister_id).map_err(|_| ConversionError { + message: "Bad source canister id".to_string(), + })?; + let t = + CanisterId::try_from(target.canister_id).map_err(|_| ConversionError { + message: "Bad target canister id".to_string(), + })?; + Ok(OrderedMessage::Request { + source: s, + target: t, + }) + } + RawOrderedMessage::Response { source, target } => { + let s = + CanisterId::try_from(source.canister_id).map_err(|_| ConversionError { + message: "Bad source canister id".to_string(), + })?; + let t = + CanisterId::try_from(target.canister_id).map_err(|_| ConversionError { + message: "Bad target canister id".to_string(), + })?; + Ok(OrderedMessage::Response { + source: s, + target: t, + }) + } + RawOrderedMessage::Heartbeat { canister_id } => { + let cid = CanisterId::try_from(canister_id.canister_id).map_err(|_| { + ConversionError { + message: "Bad canister id".to_string(), + } + })?; + Ok(OrderedMessage::Heartbeat(cid)) + } + RawOrderedMessage::Timer { canister_id } => { + let cid = CanisterId::try_from(canister_id.canister_id).map_err(|_| { + ConversionError { + message: "Bad canister id".to_string(), + } + })?; + Ok(OrderedMessage::Timer(cid)) + } + }) + .collect::, _>>()?; + Ok(ExecuteWithOrdering { + subnet_id, + messages, + }) + } +} + impl Operation for AddCycles { fn compute(&self, pic: &mut PocketIc) -> OpOut { let subnet = pic.try_route_canister(self.canister_id); diff --git a/rs/pocket_ic_server/src/state_api/routes.rs b/rs/pocket_ic_server/src/state_api/routes.rs index 18e9e14cd6c9..0b773e6beb4f 100644 --- a/rs/pocket_ic_server/src/state_api/routes.rs +++ b/rs/pocket_ic_server/src/state_api/routes.rs @@ -44,11 +44,11 @@ use pocket_ic::RejectResponse; use pocket_ic::common::rest::{ self, ApiResponse, AutoProgressConfig, ExtendedSubnetConfigSet, HttpGatewayConfig, HttpGatewayDetails, IcpConfig, IcpFeatures, InitialTime, InstanceConfig, - MockCanisterHttpResponse, RawAddCycles, RawCanisterCall, RawCanisterHttpRequest, RawCanisterId, - RawCanisterResult, RawCanisterSnapshotDownload, RawCanisterSnapshotId, - RawCanisterSnapshotUpload, RawCycles, RawIngressStatusArgs, RawMessageId, - RawMockCanisterHttpResponse, RawPrincipalId, RawSetStableMemory, RawStableMemory, RawSubnetId, - RawTickConfigs, RawTime, Topology, + MockCanisterHttpResponse, RawAddCycles, RawBufferIngressMessage, RawCanisterCall, + RawCanisterHttpRequest, RawCanisterId, RawCanisterResult, RawCanisterSnapshotDownload, + RawCanisterSnapshotId, RawCanisterSnapshotUpload, RawCycles, RawIngressStatusArgs, + RawMessageId, RawMessageOrdering, RawMockCanisterHttpResponse, RawPrincipalId, + RawSetStableMemory, RawStableMemory, RawSubnetId, RawTickConfigs, RawTime, Topology, }; use serde::Serialize; use slog::Level; @@ -145,6 +145,14 @@ where "/canister_snapshot_upload", post(handler_canister_snapshot_upload), ) + .directory_route( + "/buffer_ingress_message", + post(handler_buffer_ingress_message), + ) + .directory_route( + "/execute_with_ordering", + post(handler_execute_with_ordering), + ) } async fn handle_limit_error(req: Request, next: Next) -> Response { @@ -547,6 +555,16 @@ impl TryFrom for Vec { } } +impl TryFrom for rest::RawBufferedIngressId { + type Error = OpConversionError; + fn try_from(value: OpOut) -> Result { + match value { + OpOut::Bytes(bytes) => Ok(rest::RawBufferedIngressId { message_id: bytes }), + _ => Err(OpConversionError), + } + } +} + impl TryFrom for Result { type Error = OpConversionError; fn try_from(value: OpOut) -> Result { @@ -1411,6 +1429,48 @@ pub async fn handler_tick( (code, Json(res)) } +pub async fn handler_buffer_ingress_message( + State(AppState { api_state, .. }): State, + Path(instance_id): Path, + headers: HeaderMap, + extract::Json(raw): extract::Json, +) -> (StatusCode, Json>) { + let timeout = timeout_or_default(headers); + match crate::pocket_ic::BufferIngressMessage::try_from(raw) { + Ok(op) => { + let (code, response) = run_operation(api_state, instance_id, timeout, op).await; + (code, Json(response)) + } + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(ApiResponse::Error { + message: format!("{e:?}"), + }), + ), + } +} + +pub async fn handler_execute_with_ordering( + State(AppState { api_state, .. }): State, + Path(instance_id): Path, + headers: HeaderMap, + extract::Json(raw): extract::Json, +) -> (StatusCode, Json>) { + let timeout = timeout_or_default(headers); + match crate::pocket_ic::ExecuteWithOrdering::try_from(raw) { + Ok(op) => { + let (code, response) = run_operation(api_state, instance_id, timeout, op).await; + (code, Json(response)) + } + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(ApiResponse::Error { + message: format!("{e:?}"), + }), + ), + } +} + // ----------------------------------------------------------------------------------------------------------------- // // Other handlers @@ -1586,6 +1646,7 @@ pub async fn create_instance( auto_progress_enabled, gateway_port, instance_config.mainnet_nns_subnet_id.unwrap_or_default(), + instance_config.flexible_ordering.unwrap_or(false), ) }, auto_progress, diff --git a/rs/pocket_ic_server/src/state_api/state.rs b/rs/pocket_ic_server/src/state_api/state.rs index 7bc3a657cb1a..5ad9112dc57d 100644 --- a/rs/pocket_ic_server/src/state_api/state.rs +++ b/rs/pocket_ic_server/src/state_api/state.rs @@ -281,6 +281,7 @@ pub enum PocketIcError { BlockmakerContainedInFailed(NodeId), InvalidCanisterSnapshotDirectory(String), CanisterSnapshotError(String), + InvalidOrdering(String), } impl std::fmt::Debug for OpOut { @@ -352,6 +353,9 @@ impl std::fmt::Debug for OpOut { OpOut::Error(PocketIcError::CanisterSnapshotError(msg)) => { write!(f, "CanisterSnapshotError({msg})") } + OpOut::Error(PocketIcError::InvalidOrdering(msg)) => { + write!(f, "InvalidOrdering({msg})") + } OpOut::Bytes(bytes) => write!(f, "Bytes({})", base64::encode(bytes)), OpOut::StableMemBytes(bytes) => write!(f, "StableMemory({})", base64::encode(bytes)), OpOut::MaybeSubnetId(Some(subnet_id)) => write!(f, "SubnetId({subnet_id})"), diff --git a/rs/state_machine_tests/BUILD.bazel b/rs/state_machine_tests/BUILD.bazel index de87f8b818ce..7fe3a83e7e85 100644 --- a/rs/state_machine_tests/BUILD.bazel +++ b/rs/state_machine_tests/BUILD.bazel @@ -129,6 +129,7 @@ rust_ic_test( name = "state_machine_integration_tests", srcs = [ "tests/dts.rs", + "tests/flexible_ordering.rs", "tests/mod.rs", "tests/multi_subnet.rs", "tests/reject_remote_callbacks.rs", diff --git a/rs/state_machine_tests/src/lib.rs b/rs/state_machine_tests/src/lib.rs index 62336399baa9..df25db655055 100644 --- a/rs/state_machine_tests/src/lib.rs +++ b/rs/state_machine_tests/src/lib.rs @@ -38,7 +38,7 @@ use ic_interfaces::{ consensus_pool::ConsensusTime, execution_environment::{ IngressFilterService, IngressHistoryReader, QueryExecutionInput, QueryExecutionService, - TransformExecutionService, + Scheduler, TransformExecutionService, }, ingress_pool::{ IngressPool, IngressPoolObject, PoolSection, UnvalidatedIngressArtifact, @@ -59,7 +59,7 @@ use ic_limits::{MAX_INGRESS_TTL, PERMITTED_DRIFT, SMALL_APP_SUBNET_MAX_SIZE}; use ic_logger::replica_logger::test_logger; use ic_logger::{ReplicaLogger, error}; use ic_management_canister_types_private::{ - self as ic00, CanisterIdRecord, CanisterSnapshotDataKind, CanisterSnapshotDataOffset, + self as ic00, CanisterIdRecord, CanisterSnapshotDataKind, CanisterSnapshotDataOffset, IC_00, InstallCodeArgs, ListCanisterSnapshotArgs, ListCanisterSnapshotResponse, MasterPublicKeyId, Method, Payload, ReadCanisterSnapshotDataArgs, ReadCanisterSnapshotDataResponse, ReadCanisterSnapshotMetadataArgs, ReadCanisterSnapshotMetadataResponse, @@ -119,11 +119,13 @@ use ic_replicated_state::{ CheckpointLoadingMetrics, Memory, PageMap, ReplicatedState, canister_state::{ NextExecution, NumWasmPages, WASM_PAGE_SIZE_IN_BYTES, - canister_snapshots::CanisterSnapshots, system_state::CanisterHistory, + canister_snapshots::CanisterSnapshots, execution_state::NextScheduledMethod, + system_state::CanisterHistory, }, metadata_state::subnet_call_context_manager::{SignWithThresholdContext, ThresholdArguments}, page_map::Buffer, replicated_state::ReplicatedStateMessageRouting, + testing::{CanisterQueuesTesting, ReplicatedStateTesting}, }; use ic_state_layout::{CheckpointLayout, ReadOnly}; use ic_state_manager::StateManagerImpl; @@ -138,9 +140,9 @@ use ic_test_utilities_registry::{ use ic_test_utilities_time::FastForwardTimeSource; pub use ic_types::ingress::WasmResult; use ic_types::{ - CanisterId, CanisterLog, CountBytes, CryptoHashOfPartialState, CryptoHashOfState, Height, - NodeId, NumBytes, PrincipalId, Randomness, RegistryVersion, ReplicaVersion, SnapshotId, - SubnetId, UserId, + AccumulatedPriority, CanisterId, CanisterLog, CountBytes, CryptoHashOfPartialState, + CryptoHashOfState, Height, NodeId, NumBytes, PrincipalId, Randomness, RegistryVersion, + ReplicaVersion, SnapshotId, SubnetId, UserId, artifact::IngressMessageId, batch::{ Batch, BatchContent, BatchMessages, BatchSummary, BlockmakerMetrics, ChainKeyData, @@ -237,6 +239,109 @@ impl Verifier for FakeVerifier { } } +/// Tasks not listed here may execute implicitly during any step: +/// - DTS continuation (PausedExecution) runs with highest priority +/// - OnLowWasmMemory hook runs before heartbeat/timer/messages if triggered +/// - induct_messages_on_same_subnet moves output → input queues each round +/// - build_streams routes output to loopback each round +#[derive(Clone, Debug)] +pub enum OrderedMessage { + Ingress(CanisterId, MessageId), + Request { + source: CanisterId, + target: CanisterId, + }, + Response { + source: CanisterId, + target: CanisterId, + }, + Heartbeat(CanisterId), + Timer(CanisterId), +} + +pub struct MessageOrdering { + pub messages: Vec, +} + +impl MessageOrdering { + pub fn new(messages: Vec) -> Self { + Self { messages } + } +} + +struct FlexibleOrderingScheduler { + inner: Box>, + target: Arc>>, + suppress_subnet_messages: Arc>, +} + +impl Scheduler for FlexibleOrderingScheduler { + type State = ReplicatedState; + + fn execute_round( + &self, + mut state: ReplicatedState, + randomness: Randomness, + chain_key_data: ic_types::batch::ChainKeyData, + replica_version: &ReplicaVersion, + current_round: ic_types::ExecutionRound, + round_summary: Option, + current_round_type: ic_interfaces::execution_environment::ExecutionRoundType, + registry_settings: &ic_interfaces::execution_environment::RegistryExecutionSettings, + ) -> ReplicatedState { + let target = self.target.write().unwrap().take(); + if let Some(canister_id) = target { + let zero = AccumulatedPriority::new(0); + for (_, p) in state.metadata.subnet_schedule.iter_mut() { + p.accumulated_priority = zero; + p.priority_credit = zero; + } + state + .metadata + .subnet_schedule + .get_mut(canister_id) + .accumulated_priority = AccumulatedPriority::new(i64::MAX / 4); + } + + // Safe to restore: no new messages land in subnet_queues during + // execute_round — they stay in output until build_streams. + let suppress = *self.suppress_subnet_messages.read().unwrap(); + let saved_subnet_queues = if suppress { + Some(std::mem::take(state.subnet_queues_mut())) + } else { + None + }; + + let mut result = self.inner.execute_round( + state, + randomness, + chain_key_data, + replica_version, + current_round, + round_summary, + current_round_type, + registry_settings, + ); + + if let Some(saved) = saved_subnet_queues { + result.put_subnet_queues(saved); + } + + if target.is_some() { + let zero = AccumulatedPriority::new(0); + for (_, p) in result.metadata.subnet_schedule.iter_mut() { + p.accumulated_priority = zero; + p.priority_credit = zero; + } + } + result + } + + fn checkpoint_round_with_no_execution(&self, state: &mut ReplicatedState) { + self.inner.checkpoint_round_with_no_execution(state) + } +} + /// Adds global registry records to the registry managed by the registry data provider: /// - root subnet record; /// - routing table record; @@ -1132,6 +1237,10 @@ pub struct StateMachine { cycles_account_manager: Arc, cost_schedule: CanisterCyclesCostSchedule, hypervisor_config: HypervisorConfig, + flexible_ordering: bool, + ordering_target: Arc>>, + suppress_subnet_messages: Arc>, + ingress_buffer: RwLock>, } impl Default for StateMachine { @@ -1214,6 +1323,7 @@ pub struct StateMachineBuilder { cost_schedule: CanisterCyclesCostSchedule, subnet_admins: Vec, resource_limits: ResourceLimits, + flexible_ordering: bool, } impl StateMachineBuilder { @@ -1256,6 +1366,7 @@ impl StateMachineBuilder { cost_schedule: CanisterCyclesCostSchedule::Normal, subnet_admins: vec![], resource_limits: Default::default(), + flexible_ordering: false, } } @@ -1502,6 +1613,13 @@ impl StateMachineBuilder { } } + pub fn with_flexible_ordering(self) -> Self { + Self { + flexible_ordering: true, + ..self + } + } + /// If a registry version is provided, then new registry records are created for the `StateMachine` /// at the provided registry version. /// Otherwise, no new registry records are created. @@ -1548,6 +1666,7 @@ impl StateMachineBuilder { self.cost_schedule, self.subnet_admins, self.resource_limits, + self.flexible_ordering, ) } @@ -1917,6 +2036,7 @@ impl StateMachine { cost_schedule: CanisterCyclesCostSchedule, subnet_admins: Vec, resource_limits: ResourceLimits, + flexible_ordering: bool, ) -> Self { let checkpoint_interval_length = checkpoint_interval_length.unwrap_or(match subnet_type { SubnetType::Application | SubnetType::VerifiedApplication | SubnetType::CloudEngine => { @@ -1932,6 +2052,20 @@ impl StateMachine { Some(config) => (config.subnet_config, config.hypervisor_config), None => (SubnetConfig::new(subnet_type), HypervisorConfig::default()), }; + // One canister, one message per round. Canister budget = + // max_instructions_per_round - max_slice + 1 = 1. + // Exhausted after one message + overhead, blocking a second. + // DTS per-slice limit is independent of round budget. + if flexible_ordering { + let sc = &mut subnet_config.scheduler_config; + sc.scheduler_cores = 1; + // Scheduler computes canister budget as: + // max_instructions_per_round - max(slice, install_code_slice) + 1 + // Cap install_code_slice so it doesn't inflate the subtraction. + sc.max_instructions_per_install_code_slice = sc.max_instructions_per_slice; + sc.max_instructions_per_round = sc.max_instructions_per_slice; + sc.subnet_messages_per_round_instruction_limit = ic_types::NumInstructions::from(1); + } if let Some(ecdsa_signature_fee) = ecdsa_signature_fee { subnet_config .cycles_account_manager_config @@ -2064,11 +2198,23 @@ impl StateMachine { ) }); + let ordering_target: Arc>> = Arc::new(RwLock::new(None)); + let suppress_subnet_messages: Arc> = Arc::new(RwLock::new(false)); + let scheduler: Box> = if flexible_ordering { + Box::new(FlexibleOrderingScheduler { + inner: execution_services.scheduler, + target: Arc::clone(&ordering_target), + suppress_subnet_messages: Arc::clone(&suppress_subnet_messages), + }) + } else { + execution_services.scheduler + }; + let message_routing = SyncMessageRouting::new( Arc::clone(&state_manager) as _, Arc::clone(&state_manager) as _, Arc::clone(&execution_services.ingress_history_writer) as _, - execution_services.scheduler, + scheduler, hypervisor_config.clone(), Arc::clone(&execution_services.cycles_account_manager), subnet_id, @@ -2298,6 +2444,10 @@ impl StateMachine { cycles_account_manager: execution_services.cycles_account_manager, cost_schedule, hypervisor_config, + flexible_ordering, + ordering_target, + suppress_subnet_messages, + ingress_buffer: RwLock::new(BTreeMap::new()), } } @@ -2582,6 +2732,240 @@ impl StateMachine { .push(msg, self.get_time(), self.nodes[0].node_id); } + pub fn set_ordering_target(&self, target: Option) { + *self.ordering_target.write().unwrap() = target; + } + + fn set_suppress_subnet_messages(&self, suppress: bool) { + *self.suppress_subnet_messages.write().unwrap() = suppress; + } + + /// Reset flexible ordering state so subsequent tick()/execute_ingress() + /// calls pass through the scheduler without priority boost or suppression. + pub fn clear_flexible_ordering(&self) { + self.set_ordering_target(None); + self.set_suppress_subnet_messages(false); + } + + pub fn buffer_ingress_as( + &self, + sender: PrincipalId, + canister_id: CanisterId, + method: impl ToString, + payload: Vec, + ) -> Result { + let msg = self.ingress_message(sender, canister_id, method, payload); + let message_id = msg.id(); + self.ingress_buffer + .write() + .unwrap() + .insert(message_id.clone(), msg); + Ok(message_id) + } + + pub fn take_buffered_ingress(&self, message_id: &MessageId) -> SignedIngress { + self.ingress_buffer + .write() + .unwrap() + .remove(message_id) + .unwrap_or_else(|| panic!("No buffered ingress with id {}", message_id)) + } + + pub fn execute_with_ordering(&self, ordering: MessageOrdering) { + assert!( + self.flexible_ordering, + "call with_flexible_ordering() first" + ); + const MAX_TICKS: usize = 100; + + for msg in ordering.messages { + match msg { + OrderedMessage::Ingress(target, ref ingress_id) if target == IC_00 => { + self.set_suppress_subnet_messages(false); + let signed = self.take_buffered_ingress(ingress_id); + let payload = PayloadBuilder::new() + .with_max_expiry_time_from_now(self.get_time().into()) + .signed_ingress(signed); + self.tick_with_config(payload); + + for tick in 0..MAX_TICKS { + match self.ingress_status(ingress_id) { + IngressStatus::Known { + state: IngressState::Completed(_) | IngressState::Failed(_), + .. + } => break, + IngressStatus::Known { + state: IngressState::Processing, + .. + } if !self.has_in_flight_work(target) => break, + _ => {} + } + assert!( + tick < MAX_TICKS - 1, + "Ingress {} stuck: {:?}", + ingress_id, + self.ingress_status(ingress_id), + ); + self.tick(); + } + } + + OrderedMessage::Ingress(target, ref ingress_id) => { + self.set_suppress_subnet_messages(true); + self.set_next_scheduled_method(target, NextScheduledMethod::Message); + self.set_ordering_target(Some(target)); + let signed = self.take_buffered_ingress(ingress_id); + let payload = PayloadBuilder::new() + .with_max_expiry_time_from_now(self.get_time().into()) + .signed_ingress(signed); + self.tick_with_config(payload); + self.complete_dts(target, MAX_TICKS); + } + + OrderedMessage::Heartbeat(canister) | OrderedMessage::Timer(canister) => { + self.set_suppress_subnet_messages(true); + let method = match msg { + OrderedMessage::Heartbeat(_) => NextScheduledMethod::Heartbeat, + OrderedMessage::Timer(_) => NextScheduledMethod::GlobalTimer, + _ => unreachable!(), + }; + self.set_next_scheduled_method(canister, method); + self.set_ordering_target(Some(canister)); + self.tick(); + self.complete_dts(canister, MAX_TICKS); + } + + OrderedMessage::Request { source, target } if target == IC_00 => { + assert!( + self.next_sender_in_subnet_queue(source, target), + "No message from {} to IC_00 in subnet_queues or loopback", + source, + ); + self.set_suppress_subnet_messages(false); + self.tick(); + self.set_suppress_subnet_messages(true); + self.complete_dts(source, MAX_TICKS); + } + + OrderedMessage::Response { source, target } if source == IC_00 => { + assert!( + self.next_sender_in_subnet_queue(source, target), + "No response from IC_00 to {} in queue or loopback", + target, + ); + self.set_suppress_subnet_messages(true); + self.set_next_scheduled_method(target, NextScheduledMethod::Message); + self.set_ordering_target(Some(target)); + self.tick(); + self.complete_dts(target, MAX_TICKS); + } + + OrderedMessage::Request { source, target } + | OrderedMessage::Response { source, target } => { + self.set_suppress_subnet_messages(true); + assert!( + self.next_sender_in_queue(source, target), + "Message from {} not next in {}'s queue", + source, + target, + ); + self.set_next_scheduled_method(target, NextScheduledMethod::Message); + self.set_ordering_target(Some(target)); + self.tick(); + self.complete_dts(target, MAX_TICKS); + } + } + } + } + + fn set_next_scheduled_method(&self, canister_id: CanisterId, method: NextScheduledMethod) { + let (_, mut state) = self.state_manager.take_tip(); + let canister = state + .canister_state_make_mut(&canister_id) + .unwrap_or_else(|| panic!("Canister {} not found", canister_id)); + if let Some(es) = canister.execution_state.as_mut() { + es.next_scheduled_method = method; + } + self.state_manager + .commit_and_certify(state, CertificationScope::Metadata, None); + } + + fn complete_dts(&self, canister_id: CanisterId, max_ticks: usize) { + for tick in 0..max_ticks { + match self + .get_latest_state() + .canister_state(&canister_id) + .map(|c| c.next_execution()) + { + Some(NextExecution::ContinueLong | NextExecution::ContinueInstallCode) => { + self.set_ordering_target(Some(canister_id)); + self.tick(); + } + _ => return, + } + assert!(tick < max_ticks - 1, "DTS on {} stuck", canister_id); + } + } + + fn has_in_flight_work(&self, canister: CanisterId) -> bool { + let state = self.get_latest_state(); + + if let Some(c) = state.canister_state(&canister) { + if c.system_state.queues().has_output() || c.system_state.queues().has_input() { + return true; + } + match c.next_execution() { + NextExecution::ContinueLong | NextExecution::ContinueInstallCode => return true, + _ => {} + } + } + + self.has_loopback_messages() || state.subnet_queues().has_input() + } + + /// Checks that `source` is the next sender in `target`'s local input schedule. + fn next_sender_in_queue(&self, source: CanisterId, target: CanisterId) -> bool { + let state = self.get_latest_state(); + let queues = match state.canister_state(&target) { + Some(c) => c.system_state.queues(), + None => return false, + }; + queues.local_sender_schedule().front() == Some(&source) + } + + /// Checks that a message involving IC_00 is available: either in + /// subnet_queues (local schedule for requests to IC_00, remote schedule + /// for responses from IC_00) or in the loopback stream (not yet + /// inducted by Demux). + fn next_sender_in_subnet_queue(&self, source: CanisterId, target: CanisterId) -> bool { + let state = self.get_latest_state(); + + let in_schedule = match (source, target) { + (IC_00, _) => { + // IC_00 response: lands in remote schedule because IC_00's + // principal doesn't match own_subnet_id or any canister. + state.subnet_queues().remote_sender_schedule().front() == Some(&IC_00) + } + (source, IC_00) => { + // Request to IC_00 from a canister + state.subnet_queues().local_sender_schedule().front() == Some(&source) + } + (_, _) => { + panic!("Management canister is not involved; use next_sender_in_queue"); + } + }; + in_schedule || self.has_loopback_messages() + } + + fn has_loopback_messages(&self) -> bool { + let state = self.get_latest_state(); + let subnet_id = self.get_subnet_id(); + state + .streams() + .get(&subnet_id) + .is_some_and(|s| !s.messages().is_empty()) + } + pub fn mock_canister_http_response( &self, request_id: u64, diff --git a/rs/state_machine_tests/tests/flexible_ordering.rs b/rs/state_machine_tests/tests/flexible_ordering.rs new file mode 100644 index 000000000000..4976864512e0 --- /dev/null +++ b/rs/state_machine_tests/tests/flexible_ordering.rs @@ -0,0 +1,954 @@ +use ic_base_types::PrincipalId; +use ic_management_canister_types_private::{ + CanisterIdRecord, CanisterInstallMode, IC_00, InstallCodeArgs, Method, Payload, + ProvisionalCreateCanisterWithCyclesArgs, +}; +use ic_state_machine_tests::{ + MessageOrdering, OrderedMessage, StateMachine, StateMachineBuilder, WasmResult, +}; +use ic_types::ingress::{IngressState, IngressStatus}; +use ic_types_cycles::Cycles; +use ic_universal_canister::{CallArgs, CallInterface, UNIVERSAL_CANISTER_WASM, management, wasm}; +use std::panic; + +const INITIAL_CYCLES_BALANCE: Cycles = Cycles::new(100_000_000_000_000); + +fn setup() -> StateMachine { + StateMachineBuilder::new().with_flexible_ordering().build() +} + +fn install_uc(sm: &StateMachine) -> ic_base_types::CanisterId { + sm.install_canister_with_cycles( + UNIVERSAL_CANISTER_WASM.to_vec(), + vec![], + None, + INITIAL_CYCLES_BALANCE, + ) + .unwrap() +} + +fn get_reply(sm: &StateMachine, msg_id: &ic_types::messages::MessageId) -> Vec { + match sm.ingress_status(msg_id) { + IngressStatus::Known { + state: IngressState::Completed(WasmResult::Reply(bytes)), + .. + } => bytes, + other => panic!("Expected completed reply, got: {:?}", other), + } +} + +#[test] +fn test_basic_inter_canister_ordering() { + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = install_uc(&sm); + + let b_reply = wasm().reply_data(b"hello from B").build(); + let a_payload = wasm() + .inter_update(canister_b, CallArgs::default().other_side(b_reply)) + .build(); + + let ingress_id = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_payload, + ) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister_a, ingress_id.clone()), + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + ])); + + assert_eq!(get_reply(&sm, &ingress_id), b"hello from B"); +} + +#[test] +fn test_ingress_ordering_on_same_canister() { + let sm = setup(); + let canister = install_uc(&sm); + + let payload_1 = wasm().set_global_data(b"first").reply_data(b"ok1").build(); + let payload_2 = wasm().set_global_data(b"second").reply_data(b"ok2").build(); + + let id1 = sm + .buffer_ingress_as(PrincipalId::new_anonymous(), canister, "update", payload_1) + .unwrap(); + let id2 = sm + .buffer_ingress_as(PrincipalId::new_anonymous(), canister, "update", payload_2) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister, id1.clone()), + OrderedMessage::Ingress(canister, id2.clone()), + ])); + + assert_eq!(get_reply(&sm, &id1), b"ok1"); + assert_eq!(get_reply(&sm, &id2), b"ok2"); + + let read_payload = wasm().get_global_data().append_and_reply().build(); + let result = sm + .execute_ingress(canister, "update", read_payload) + .unwrap(); + match result { + WasmResult::Reply(data) => assert_eq!(data, b"second"), + _ => panic!("Expected reply"), + } +} + +#[test] +fn test_reversed_ingress_ordering() { + let sm = setup(); + let canister = install_uc(&sm); + + let payload_1 = wasm().set_global_data(b"first").reply_data(b"ok1").build(); + let payload_2 = wasm().set_global_data(b"second").reply_data(b"ok2").build(); + + let id1 = sm + .buffer_ingress_as(PrincipalId::new_anonymous(), canister, "update", payload_1) + .unwrap(); + let id2 = sm + .buffer_ingress_as(PrincipalId::new_anonymous(), canister, "update", payload_2) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister, id2.clone()), + OrderedMessage::Ingress(canister, id1.clone()), + ])); + + assert_eq!(get_reply(&sm, &id1), b"ok1"); + assert_eq!(get_reply(&sm, &id2), b"ok2"); + + let read_payload = wasm().get_global_data().append_and_reply().build(); + let result = sm + .execute_ingress(canister, "update", read_payload) + .unwrap(); + match result { + WasmResult::Reply(data) => assert_eq!(data, b"first"), + _ => panic!("Expected reply"), + } +} + +/// A → B → C → B → A chain. +#[test] +fn test_three_canister_chain_ordering() { + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = install_uc(&sm); + let canister_c = install_uc(&sm); + + let c_reply = wasm().reply_data(b"hello from C").build(); + let b_on_reply = wasm().reply_data(b"hello from C via B").build(); + let b_payload = wasm() + .inter_update( + canister_c, + CallArgs::default().other_side(c_reply).on_reply(b_on_reply), + ) + .build(); + let a_payload = wasm() + .inter_update(canister_b, CallArgs::default().other_side(b_payload)) + .build(); + + let ingress_id = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_payload, + ) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister_a, ingress_id.clone()), + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Request { + source: canister_b, + target: canister_c, + }, + OrderedMessage::Response { + source: canister_c, + target: canister_b, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + ])); + + assert_eq!(get_reply(&sm, &ingress_id), b"hello from C via B"); +} + +/// Two independent chains (A→B, C→D) interleaved. +#[test] +fn test_interleaved_inter_canister_calls() { + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = install_uc(&sm); + let canister_c = install_uc(&sm); + let canister_d = install_uc(&sm); + + let b_reply = wasm().reply_data(b"reply from B").build(); + let a_calls_b = wasm() + .inter_update(canister_b, CallArgs::default().other_side(b_reply)) + .build(); + + let d_reply = wasm().reply_data(b"reply from D").build(); + let c_calls_d = wasm() + .inter_update(canister_d, CallArgs::default().other_side(d_reply)) + .build(); + + let ingress_a = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_calls_b, + ) + .unwrap(); + let ingress_c = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_c, + "update", + c_calls_d, + ) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister_a, ingress_a.clone()), + OrderedMessage::Ingress(canister_c, ingress_c.clone()), + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Request { + source: canister_c, + target: canister_d, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + OrderedMessage::Response { + source: canister_d, + target: canister_c, + }, + ])); + + assert_eq!(get_reply(&sm, &ingress_a), b"reply from B"); + assert_eq!(get_reply(&sm, &ingress_c), b"reply from D"); +} + +/// install_code interleaved with inter-canister calls. +#[test] +fn test_subnet_message_ordering() { + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = install_uc(&sm); + + let b_reply = wasm().reply_data(b"before upgrade").build(); + let a_calls_b = wasm() + .inter_update(canister_b, CallArgs::default().other_side(b_reply)) + .build(); + + let ingress_a = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_calls_b, + ) + .unwrap(); + + let new_b_wasm = UNIVERSAL_CANISTER_WASM.to_vec(); + let install_args = + InstallCodeArgs::new(CanisterInstallMode::Upgrade, canister_b, new_b_wasm, vec![]); + let install_ingress = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + IC_00, + "install_code", + install_args.encode(), + ) + .unwrap(); + + let b_reply_after = wasm().reply_data(b"after upgrade").build(); + let a_calls_b_again = wasm() + .inter_update(canister_b, CallArgs::default().other_side(b_reply_after)) + .build(); + + let ingress_a2 = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_calls_b_again, + ) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister_a, ingress_a.clone()), + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + OrderedMessage::Ingress(IC_00, install_ingress.clone()), + OrderedMessage::Ingress(canister_a, ingress_a2.clone()), + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + ])); + + assert_eq!(get_reply(&sm, &ingress_a), b"before upgrade"); + match sm.ingress_status(&install_ingress) { + IngressStatus::Known { + state: IngressState::Completed(_), + .. + } => {} + other => panic!("Expected install_code to complete, got: {:?}", other), + } + assert_eq!(get_reply(&sm, &ingress_a2), b"after upgrade"); +} + +/// Canister calls create_canister on IC_00 via inter-canister call. +#[test] +fn test_canister_calls_management_canister() { + let sm = setup(); + let canister_a = install_uc(&sm); + + let on_reply = wasm().message_payload().reply_data_append().reply().build(); + let a_payload = wasm() + .call(management::create_canister(INITIAL_CYCLES_BALANCE.get() / 2).on_reply(on_reply)) + .build(); + + let ingress_a = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_payload, + ) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister_a, ingress_a.clone()), + OrderedMessage::Request { + source: canister_a, + target: IC_00, + }, + OrderedMessage::Response { + source: IC_00, + target: canister_a, + }, + ])); + + let reply = get_reply(&sm, &ingress_a); + let record = CanisterIdRecord::decode(&reply).expect("Failed to decode canister ID"); + assert!( + sm.get_latest_state() + .canister_state(&record.get_canister_id()) + .is_some(), + "Created canister should exist" + ); +} + +/// Two mgmt canister ingress in separate steps — only one completes per step. +#[test] +fn test_one_subnet_message_per_round() { + let sm = setup(); + + let args = ProvisionalCreateCanisterWithCyclesArgs { + amount: Some(candid::Nat::from(0_u64)), + settings: None, + specified_id: None, + sender_canister_version: None, + } + .encode(); + + let id1 = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + IC_00, + Method::ProvisionalCreateCanisterWithCycles, + args.clone(), + ) + .unwrap(); + let id2 = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + IC_00, + Method::ProvisionalCreateCanisterWithCycles, + args, + ) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![OrderedMessage::Ingress( + IC_00, + id1.clone(), + )])); + + let is_done = |id: &ic_types::messages::MessageId| { + matches!( + sm.ingress_status(id), + IngressStatus::Known { + state: IngressState::Completed(_), + .. + } + ) + }; + assert!( + is_done(&id1), + "First create_canister should be done: {:?}", + sm.ingress_status(&id1) + ); + assert!( + !is_done(&id2), + "Second create_canister should NOT be done yet: {:?}", + sm.ingress_status(&id2) + ); + + sm.execute_with_ordering(MessageOrdering::new(vec![OrderedMessage::Ingress( + IC_00, + id2.clone(), + )])); + + assert!( + is_done(&id2), + "Second create_canister should be done: {:?}", + sm.ingress_status(&id2) + ); +} + +/// DTS: message sliced across multiple rounds. +#[test] +fn test_dts_execution_completes() { + fn slow_wasm() -> Vec { + wat::parse_str( + r#"(module + (import "ic0" "msg_reply" (func $msg_reply)) + (func $run + (local $i i32) + (loop $loop + (local.set $i (i32.add (local.get $i) (i32.const 1))) + (br_if $loop (i32.lt_s (local.get $i) (i32.const 3000000000))) + ) + (call $msg_reply)) + (memory $memory 1) + (export "canister_update run" (func $run)) + )"#, + ) + .unwrap() + } + + let sm = setup(); + let canister = sm.create_canister_with_cycles(None, INITIAL_CYCLES_BALANCE, None); + sm.install_wasm_in_mode(canister, CanisterInstallMode::Install, slow_wasm(), vec![]) + .unwrap(); + + let ingress_id = sm + .buffer_ingress_as(PrincipalId::new_anonymous(), canister, "run", vec![]) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![OrderedMessage::Ingress( + canister, + ingress_id.clone(), + )])); + + match sm.ingress_status(&ingress_id) { + IngressStatus::Known { + state: IngressState::Completed(WasmResult::Reply(_)), + .. + } => {} + other => panic!("Expected DTS message to complete, got: {:?}", other), + } +} + +#[test] +fn test_panics_without_flexible_ordering() { + let sm = StateMachineBuilder::new().build(); + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + sm.execute_with_ordering(MessageOrdering::new(vec![])); + })); + assert!(result.is_err(), "Should panic without flexible ordering"); +} + +/// Two calls A→B, each request/response handled separately. +#[test] +fn test_two_calls_same_source_separate_steps() { + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = install_uc(&sm); + + let b_reply_1 = wasm().reply_data(b"reply1").build(); + let a_calls_b_1 = wasm() + .inter_update(canister_b, CallArgs::default().other_side(b_reply_1)) + .build(); + let b_reply_2 = wasm().reply_data(b"reply2").build(); + let a_calls_b_2 = wasm() + .inter_update(canister_b, CallArgs::default().other_side(b_reply_2)) + .build(); + + let ingress_a = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_calls_b_1, + ) + .unwrap(); + let ingress_b = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_calls_b_2, + ) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister_a, ingress_a.clone()), + OrderedMessage::Ingress(canister_a, ingress_b.clone()), + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + ])); + + assert_eq!(get_reply(&sm, &ingress_a), b"reply1"); + assert_eq!(get_reply(&sm, &ingress_b), b"reply2"); +} + +#[test] +fn test_alternating_call_response() { + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = install_uc(&sm); + + let b_reply_1 = wasm().reply_data(b"first").build(); + let a_calls_b_1 = wasm() + .inter_update(canister_b, CallArgs::default().other_side(b_reply_1)) + .build(); + let b_reply_2 = wasm().reply_data(b"second").build(); + let a_calls_b_2 = wasm() + .inter_update(canister_b, CallArgs::default().other_side(b_reply_2)) + .build(); + + let ingress_a = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_calls_b_1, + ) + .unwrap(); + let ingress_b = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_calls_b_2, + ) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister_a, ingress_a.clone()), + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + OrderedMessage::Ingress(canister_a, ingress_b.clone()), + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + ])); + + assert_eq!(get_reply(&sm, &ingress_a), b"first"); + assert_eq!(get_reply(&sm, &ingress_b), b"second"); +} + +#[test] +fn test_impossible_ordering_response_from_uninvolved() { + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = install_uc(&sm); + let canister_c = install_uc(&sm); + + let b_reply = wasm().reply_data(b"hi").build(); + let a_payload = wasm() + .inter_update(canister_b, CallArgs::default().other_side(b_reply)) + .build(); + let ingress_id = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_payload, + ) + .unwrap(); + + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister_a, ingress_id.clone()), + OrderedMessage::Response { + source: canister_c, + target: canister_a, + }, + ])); + })); + assert!( + result.is_err(), + "Should panic: response from uninvolved canister" + ); +} + +#[test] +fn test_impossible_ordering_wrong_source() { + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = install_uc(&sm); + let canister_c = install_uc(&sm); + + let b_reply = wasm().reply_data(b"hi").build(); + let a_payload = wasm() + .inter_update(canister_b, CallArgs::default().other_side(b_reply)) + .build(); + let ingress_id = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_payload, + ) + .unwrap(); + + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister_a, ingress_id.clone()), + OrderedMessage::Request { + source: canister_c, + target: canister_b, + }, + ])); + })); + assert!(result.is_err(), "Should panic: wrong source canister"); +} + +#[test] +fn test_impossible_ordering_no_messages() { + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = install_uc(&sm); + + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + sm.execute_with_ordering(MessageOrdering::new(vec![OrderedMessage::Request { + source: canister_a, + target: canister_b, + }])); + })); + assert!(result.is_err(), "Should panic: no messages in queue"); +} + +#[test] +fn test_request_with_heartbeat() { + fn heartbeat_wasm() -> Vec { + wat::parse_str( + r#"(module + (import "ic0" "msg_reply" (func $msg_reply)) + (import "ic0" "msg_reply_data_append" (func $msg_reply_data_append (param i32 i32))) + (func $heartbeat + (i32.store (i32.const 0) + (i32.add (i32.load (i32.const 0)) (i32.const 1)))) + (func $read + (call $msg_reply_data_append (i32.const 0) (i32.const 4)) + (call $msg_reply)) + (memory 1) + (export "canister_heartbeat" (func $heartbeat)) + (export "canister_update read" (func $read)) + (export "canister_query read_query" (func $read)) + )"#, + ) + .unwrap() + } + + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = sm.create_canister_with_cycles(None, INITIAL_CYCLES_BALANCE, None); + sm.install_wasm_in_mode( + canister_b, + CanisterInstallMode::Install, + heartbeat_wasm(), + vec![], + ) + .unwrap(); + + let baseline = sm.query(canister_b, "read_query", vec![]).unwrap(); + let baseline = u32::from_le_bytes(baseline.bytes()[..4].try_into().unwrap()); + + let a_payload = wasm() + .call_simple(canister_b, "read", CallArgs::default()) + .build(); + let ingress_a = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_payload, + ) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister_a, ingress_a.clone()), + OrderedMessage::Heartbeat(canister_b), + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + ])); + + let reply = get_reply(&sm, &ingress_a); + let counter = u32::from_le_bytes(reply[..4].try_into().unwrap()); + assert_eq!( + counter - baseline, + 1, + "Heartbeat should have run exactly once during ordering (baseline={}, after={})", + baseline, + counter + ); +} + +#[test] +fn test_request_with_timer() { + fn timer_wasm() -> Vec { + wat::parse_str( + r#"(module + (import "ic0" "msg_reply" (func $msg_reply)) + (import "ic0" "msg_reply_data_append" (func $msg_reply_data_append (param i32 i32))) + (import "ic0" "global_timer_set" (func $global_timer_set (param i64) (result i64))) + (func $init + (drop (call $global_timer_set (i64.const 1)))) + (func $timer + (i32.store (i32.const 0) + (i32.add (i32.load (i32.const 0)) (i32.const 1))) + (drop (call $global_timer_set (i64.const 1)))) + (func $read + (call $msg_reply_data_append (i32.const 0) (i32.const 4)) + (call $msg_reply)) + (memory 1) + (export "canister_init" (func $init)) + (export "canister_global_timer" (func $timer)) + (export "canister_update read" (func $read)) + (export "canister_query read_query" (func $read)) + )"#, + ) + .unwrap() + } + + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = sm.create_canister_with_cycles(None, INITIAL_CYCLES_BALANCE, None); + sm.install_wasm_in_mode( + canister_b, + CanisterInstallMode::Install, + timer_wasm(), + vec![], + ) + .unwrap(); + + // Read baseline counter from wasm memory (heartbeat may fire during install). + let baseline = { + let state = sm.get_latest_state(); + let es = state + .canister_state(&canister_b) + .unwrap() + .execution_state + .as_ref() + .unwrap(); + let page = es + .wasm_memory + .page_map + .get_page(ic_replicated_state::PageIndex::new(0)); + u32::from_le_bytes(page[..4].try_into().unwrap()) + }; + + let a_payload = wasm() + .call_simple(canister_b, "read", CallArgs::default()) + .build(); + let ingress_a = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_payload, + ) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister_a, ingress_a.clone()), + OrderedMessage::Timer(canister_b), + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + ])); + + let reply = get_reply(&sm, &ingress_a); + let counter = u32::from_le_bytes(reply[..4].try_into().unwrap()); + assert_eq!( + counter - baseline, + 1, + "Timer should have fired exactly once during ordering (baseline={}, after={})", + baseline, + counter + ); +} + +#[test] +fn test_self_call() { + let sm = setup(); + let canister = install_uc(&sm); + + let self_reply = wasm().reply_data(b"self-reply").build(); + let payload = wasm() + .inter_update(canister, CallArgs::default().other_side(self_reply)) + .build(); + + let ingress_id = sm + .buffer_ingress_as(PrincipalId::new_anonymous(), canister, "update", payload) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Ingress(canister, ingress_id.clone()), + OrderedMessage::Request { + source: canister, + target: canister, + }, + OrderedMessage::Response { + source: canister, + target: canister, + }, + ])); + + assert_eq!(get_reply(&sm, &ingress_id), b"self-reply"); +} + +#[test] +fn test_heartbeat_then_request() { + fn heartbeat_wasm() -> Vec { + wat::parse_str( + r#"(module + (import "ic0" "msg_reply" (func $msg_reply)) + (import "ic0" "msg_reply_data_append" (func $msg_reply_data_append (param i32 i32))) + (func $heartbeat + (i32.store (i32.const 0) + (i32.add (i32.load (i32.const 0)) (i32.const 1)))) + (func $read + (call $msg_reply_data_append (i32.const 0) (i32.const 4)) + (call $msg_reply)) + (memory 1) + (export "canister_heartbeat" (func $heartbeat)) + (export "canister_update read" (func $read)) + (export "canister_query read_query" (func $read)) + )"#, + ) + .unwrap() + } + + let sm = setup(); + let canister_a = install_uc(&sm); + let canister_b = sm.create_canister_with_cycles(None, INITIAL_CYCLES_BALANCE, None); + sm.install_wasm_in_mode( + canister_b, + CanisterInstallMode::Install, + heartbeat_wasm(), + vec![], + ) + .unwrap(); + + let baseline = sm.query(canister_b, "read_query", vec![]).unwrap(); + let baseline = u32::from_le_bytes(baseline.bytes()[..4].try_into().unwrap()); + + let a_payload = wasm() + .call_simple(canister_b, "read", CallArgs::default()) + .build(); + let ingress_id = sm + .buffer_ingress_as( + PrincipalId::new_anonymous(), + canister_a, + "update", + a_payload, + ) + .unwrap(); + + sm.execute_with_ordering(MessageOrdering::new(vec![ + OrderedMessage::Heartbeat(canister_b), + OrderedMessage::Ingress(canister_a, ingress_id.clone()), + OrderedMessage::Request { + source: canister_a, + target: canister_b, + }, + OrderedMessage::Response { + source: canister_b, + target: canister_a, + }, + ])); + + let reply = get_reply(&sm, &ingress_id); + let counter = u32::from_le_bytes(reply[..4].try_into().unwrap()); + assert_eq!( + counter - baseline, + 1, + "Heartbeat should have run exactly once during ordering (baseline={}, after={})", + baseline, + counter + ); +} diff --git a/rs/state_machine_tests/tests/mod.rs b/rs/state_machine_tests/tests/mod.rs index 65b2bffccba9..fb15267bfb65 100644 --- a/rs/state_machine_tests/tests/mod.rs +++ b/rs/state_machine_tests/tests/mod.rs @@ -1,3 +1,4 @@ mod dts; +mod flexible_ordering; mod multi_subnet; mod reject_remote_callbacks;