Skip to content
Draft

wip #9650

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ca29a48
wip
venkkatesh-sekar Mar 20, 2026
ff973ae
test
venkkatesh-sekar Mar 20, 2026
c38d823
test
venkkatesh-sekar Mar 20, 2026
5058129
test
venkkatesh-sekar Mar 20, 2026
644e63a
as
venkkatesh-sekar Mar 20, 2026
b7d9934
as
venkkatesh-sekar Mar 20, 2026
7999e44
as
venkkatesh-sekar Mar 20, 2026
c5472d9
as
venkkatesh-sekar Mar 20, 2026
096bad4
as
venkkatesh-sekar Mar 20, 2026
228ed68
as
venkkatesh-sekar Mar 20, 2026
7a6b1b2
as
venkkatesh-sekar Mar 20, 2026
a59c9fb
as
venkkatesh-sekar Mar 20, 2026
c9c8349
Merge branch 'master' into vsekar/fmo
venkkatesh-sekar Mar 20, 2026
4bae9dd
as
venkkatesh-sekar Mar 20, 2026
4707aaa
test
venkkatesh-sekar Mar 20, 2026
2d423ae
Merge branch 'master' into vsekar/fmo
venkkatesh-sekar Mar 26, 2026
30dd611
test
venkkatesh-sekar Mar 26, 2026
c8d5796
test
venkkatesh-sekar Mar 26, 2026
a3db307
test
venkkatesh-sekar Mar 26, 2026
aef1fc5
test
venkkatesh-sekar Mar 26, 2026
73ffcb6
flush
venkkatesh-sekar Mar 26, 2026
fc1db26
v1
venkkatesh-sekar Mar 26, 2026
d33cb72
Merge branch 'master' into vsekar/fmo
venkkatesh-sekar Mar 26, 2026
d71935a
remove ordering
venkkatesh-sekar Mar 27, 2026
2a07aac
make queue check strong
venkkatesh-sekar Mar 27, 2026
b3a4ec5
clean
venkkatesh-sekar Mar 27, 2026
1b4f387
clean
venkkatesh-sekar Mar 27, 2026
14351de
fix
venkkatesh-sekar Mar 27, 2026
ec488ac
test
venkkatesh-sekar Mar 27, 2026
e9ff57c
test
venkkatesh-sekar Mar 27, 2026
164b93d
test
venkkatesh-sekar Mar 27, 2026
d315527
clean budget
venkkatesh-sekar Mar 27, 2026
bab9a43
Merge branch 'master' into vsekar/fmo
venkkatesh-sekar Mar 27, 2026
b195890
test
venkkatesh-sekar Mar 27, 2026
ed8f333
test
venkkatesh-sekar Mar 28, 2026
1c166f5
test
venkkatesh-sekar Mar 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions packages/pocket-ic/src/common/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ pub struct InstanceConfig {
pub incomplete_state: Option<IncompleteStateFlag>,
pub initial_time: Option<InitialTime>,
pub mainnet_nns_subnet_id: Option<bool>,
pub flexible_ordering: Option<bool>,
}

#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize, Default, JsonSchema)]
Expand Down Expand Up @@ -1238,3 +1239,59 @@ pub struct RawCanisterSnapshotId {
#[serde(serialize_with = "base64::serialize")]
pub snapshot_id: Vec<u8>,
}

// ================================================================================================================= //
// Flexible message ordering types
// ================================================================================================================= //

#[derive(Clone, Serialize, Deserialize, Debug, JsonSchema)]
pub enum RawOrderedMessage {
Ingress {
canister_id: RawCanisterId,
#[serde(deserialize_with = "base64::deserialize")]
#[serde(serialize_with = "base64::serialize")]
message_id: Vec<u8>,
},
Request {
source: RawCanisterId,
target: RawCanisterId,
},
Response {
source: RawCanisterId,
target: RawCanisterId,
},
Heartbeat {
canister_id: RawCanisterId,
},
Timer {
canister_id: RawCanisterId,
},
}

#[derive(Clone, Serialize, Deserialize, Debug, JsonSchema)]
pub struct RawMessageOrdering {
pub subnet_id: RawSubnetId,
pub messages: Vec<RawOrderedMessage>,
}

#[derive(Clone, Serialize, Deserialize, Debug, JsonSchema)]
pub struct RawBufferIngressMessage {
pub subnet_id: RawSubnetId,
#[serde(deserialize_with = "base64::deserialize")]
#[serde(serialize_with = "base64::serialize")]
pub sender: Vec<u8>,
#[serde(deserialize_with = "base64::deserialize")]
#[serde(serialize_with = "base64::serialize")]
pub canister_id: Vec<u8>,
pub method: String,
#[serde(deserialize_with = "base64::deserialize")]
#[serde(serialize_with = "base64::serialize")]
pub payload: Vec<u8>,
}

#[derive(Clone, Serialize, Deserialize, Debug, JsonSchema)]
pub struct RawBufferedIngressId {
#[serde(deserialize_with = "base64::deserialize")]
#[serde(serialize_with = "base64::serialize")]
pub message_id: Vec<u8>,
}
43 changes: 43 additions & 0 deletions packages/pocket-ic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub struct PocketIcBuilder {
icp_features: IcpFeatures,
initial_time: Option<InitialTime>,
mainnet_nns_subnet_id: Option<bool>,
flexible_ordering: bool,
}

#[allow(clippy::new_without_default)]
Expand All @@ -195,6 +196,7 @@ impl PocketIcBuilder {
icp_features: IcpFeatures::default(),
initial_time: None,
mainnet_nns_subnet_id: None,
flexible_ordering: false,
}
}

Expand All @@ -220,6 +222,7 @@ impl PocketIcBuilder {
self.initial_time,
self.http_gateway_config,
self.mainnet_nns_subnet_id,
self.flexible_ordering,
)
}

Expand All @@ -239,6 +242,7 @@ impl PocketIcBuilder {
self.initial_time,
self.http_gateway_config,
self.mainnet_nns_subnet_id,
self.flexible_ordering,
)
.await
}
Expand Down Expand Up @@ -495,6 +499,12 @@ impl PocketIcBuilder {
self.mainnet_nns_subnet_id = Some(true);
self
}

/// Enable flexible message ordering on all subnets.
pub fn with_flexible_ordering(mut self) -> Self {
self.flexible_ordering = true;
self
}
}

/// Representation of system time as duration since UNIX epoch
Expand Down Expand Up @@ -615,6 +625,7 @@ impl PocketIc {
initial_time: Option<InitialTime>,
http_gateway_config: Option<InstanceHttpGatewayConfig>,
mainnet_nns_subnet_id: Option<bool>,
flexible_ordering: bool,
) -> Self {
let (tx, rx) = channel();
let thread = thread::spawn(move || {
Expand Down Expand Up @@ -642,6 +653,7 @@ impl PocketIc {
initial_time,
http_gateway_config,
mainnet_nns_subnet_id,
flexible_ordering,
)
.await
});
Expand Down Expand Up @@ -938,6 +950,37 @@ impl PocketIc {
})
}

/// Buffer an ingress message on a specific subnet for later ordered execution.
pub fn buffer_ingress(
&self,
subnet_id: Principal,
sender: Principal,
canister_id: CanisterId,
method: &str,
payload: Vec<u8>,
) -> Vec<u8> {
let runtime = self.runtime.clone();
runtime.block_on(async {
self.pocket_ic
.buffer_ingress(subnet_id, sender, canister_id, method, payload)
.await
})
}

/// Execute messages in a specified order on a given subnet.
pub fn execute_with_ordering(
&self,
subnet_id: Principal,
messages: Vec<common::rest::RawOrderedMessage>,
) {
let runtime = self.runtime.clone();
runtime.block_on(async {
self.pocket_ic
.execute_with_ordering(subnet_id, messages)
.await
})
}

/// Await an update call submitted previously by `submit_call` or `submit_call_with_effective_principal`.
pub fn await_call(&self, message_id: RawMessageId) -> Result<Vec<u8>, RejectResponse> {
let runtime = self.runtime.clone();
Expand Down
46 changes: 43 additions & 3 deletions packages/pocket-ic/src/nonblocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use crate::common::rest::{
CreateHttpGatewayResponse, CreateInstanceResponse, ExtendedSubnetConfigSet, HttpGatewayBackend,
HttpGatewayConfig, HttpGatewayInfo, HttpsConfig, IcpConfig, IcpFeatures, InitialTime,
InstanceConfig, InstanceHttpGatewayConfig, InstanceId, MockCanisterHttpResponse, RawAddCycles,
RawCanisterCall, RawCanisterHttpRequest, RawCanisterId, RawCanisterResult,
RawCanisterSnapshotDownload, RawCanisterSnapshotId, RawCanisterSnapshotUpload, RawCycles,
RawEffectivePrincipal, RawIngressStatusArgs, RawMessageId, RawMockCanisterHttpResponse,
RawBufferIngressMessage, RawBufferedIngressId, RawCanisterCall, RawCanisterHttpRequest,
RawCanisterId, RawCanisterResult, RawCanisterSnapshotDownload, RawCanisterSnapshotId,
RawCanisterSnapshotUpload, RawCycles, RawEffectivePrincipal, RawIngressStatusArgs,
RawMessageId, RawMessageOrdering, RawMockCanisterHttpResponse, RawOrderedMessage,
RawPrincipalId, RawSetStableMemory, RawStableMemory, RawSubnetId, RawTickConfigs, RawTime,
RawVerifyCanisterSigArg, SubnetId, Topology,
};
Expand Down Expand Up @@ -148,6 +149,7 @@ impl PocketIc {
initial_time: Option<InitialTime>,
http_gateway_config: Option<InstanceHttpGatewayConfig>,
mainnet_nns_subnet_id: Option<bool>,
flexible_ordering: bool,
) -> Self {
let server_url = if let Some(server_url) = server_url {
server_url
Expand Down Expand Up @@ -204,6 +206,7 @@ impl PocketIc {
incomplete_state: None,
initial_time,
mainnet_nns_subnet_id,
flexible_ordering: if flexible_ordering { Some(true) } else { None },
};

let test_driver_pid = std::process::id();
Expand Down Expand Up @@ -689,6 +692,43 @@ impl PocketIc {
self.post(endpoint, raw_canister_call).await
}

/// Buffer an ingress message on a specific subnet for later ordered execution.
pub async fn buffer_ingress(
&self,
subnet_id: Principal,
sender: Principal,
canister_id: CanisterId,
method: &str,
payload: Vec<u8>,
) -> Vec<u8> {
let raw = RawBufferIngressMessage {
subnet_id: RawSubnetId {
subnet_id: subnet_id.as_slice().to_vec(),
},
sender: sender.as_slice().to_vec(),
canister_id: canister_id.as_slice().to_vec(),
method: method.to_string(),
payload,
};
let result: RawBufferedIngressId = self.post("update/buffer_ingress_message", raw).await;
result.message_id
}

/// Execute messages in a specified order on a given subnet.
pub async fn execute_with_ordering(
&self,
subnet_id: Principal,
messages: Vec<RawOrderedMessage>,
) {
let raw = RawMessageOrdering {
subnet_id: RawSubnetId {
subnet_id: subnet_id.as_slice().to_vec(),
},
messages,
};
let _: () = self.post("update/execute_with_ordering", raw).await;
}

/// Await an update call submitted previously by `submit_call` or `submit_call_with_effective_principal`.
pub async fn await_call(&self, message_id: RawMessageId) -> Result<Vec<u8>, RejectResponse> {
let endpoint = "update/await_ingress_message";
Expand Down
63 changes: 63 additions & 0 deletions packages/pocket-ic/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3435,3 +3435,66 @@ fn cloud_engine_default_effective_canister_id() {
topology.default_effective_canister_id.clone().into();
assert_eq!(effective_canister_id, default_effective_canister_id);
}

#[test]
fn test_flexible_ordering_basic() {
use pocket_ic::common::rest::{RawCanisterId, RawOrderedMessage};

let pic = PocketIcBuilder::new()
.with_application_subnet()
.with_flexible_ordering()
.build();

let topology = pic.topology();
let subnet_id = topology.get_app_subnets()[0];

let canister_id = pic.create_canister_on_subnet(None, None, subnet_id);
pic.add_cycles(canister_id, INIT_CYCLES);
pic.install_canister(canister_id, counter_wasm(), vec![], None);

// Buffer two write calls.
let msg_id_1 = pic.buffer_ingress(
subnet_id,
Principal::anonymous(),
canister_id,
"write",
encode_one(()).unwrap(),
);
let msg_id_2 = pic.buffer_ingress(
subnet_id,
Principal::anonymous(),
canister_id,
"write",
encode_one(()).unwrap(),
);

// Execute them in order via flexible ordering.
pic.execute_with_ordering(
subnet_id,
vec![
RawOrderedMessage::Ingress {
canister_id: RawCanisterId {
canister_id: canister_id.as_slice().to_vec(),
},
message_id: msg_id_1,
},
RawOrderedMessage::Ingress {
canister_id: RawCanisterId {
canister_id: canister_id.as_slice().to_vec(),
},
message_id: msg_id_2,
},
],
);

// Counter should be 2 (two writes executed).
let reply = pic
.query_call(
canister_id,
Principal::anonymous(),
"read",
encode_one(()).unwrap(),
)
.unwrap();
assert_eq!(reply, vec![2, 0, 0, 0]);
}
29 changes: 24 additions & 5 deletions rs/config/src/subnet_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ const INSTRUCTION_OVERHEAD_PER_CANISTER_FOR_FINALIZATION: NumInstructions =
// would cause the longest possible round of 4B instructions or 2 seconds.
const MAX_INSTRUCTIONS_PER_ROUND: NumInstructions = NumInstructions::new(4 * B);

/// Ideally we would split the per-round limit between subnet messages and
/// canister messages, so that their sum cannot exceed the limit. That would
/// make the limit for canister messages variable, which would break assumptions
/// of the scheduling algorithm. The next best thing we can do is to limit
/// subnet messages on top of the fixed limit for canister messages.
/// The value of the limit for subnet messages is chosen quite arbitrarily
/// as 1/16 of the fixed limit. Any other value in the same ballpark would
/// work here.
pub const SUBNET_MESSAGES_LIMIT_FRACTION: u64 = 16;

// Limit per `install_code` message. It's bigger than the limit for a regular
// update call to allow for canisters with bigger state to be upgraded.
const MAX_INSTRUCTIONS_PER_INSTALL_CODE: NumInstructions = NumInstructions::new(300 * B);
Expand Down Expand Up @@ -274,6 +284,10 @@ pub struct SchedulerConfig {

/// Number of instructions to count when uploading or downloading binary snapshot data.
pub canister_snapshot_data_baseline_instructions: NumInstructions,

/// Per-round subnet message instruction budget.
/// Defaults to `max_instructions_per_round / SUBNET_MESSAGES_LIMIT_FRACTION`.
pub subnet_messages_per_round_instruction_limit: NumInstructions,
}

impl SchedulerConfig {
Expand Down Expand Up @@ -305,6 +319,8 @@ impl SchedulerConfig {
DEFAULT_CANISTERS_SNAPSHOT_BASELINE_INSTRUCTIONS,
canister_snapshot_data_baseline_instructions:
DEFAULT_CANISTERS_SNAPSHOT_DATA_BASELINE_INSTRUCTIONS,
subnet_messages_per_round_instruction_limit: MAX_INSTRUCTIONS_PER_ROUND
/ SUBNET_MESSAGES_LIMIT_FRACTION,
}
}

Expand All @@ -314,18 +330,19 @@ impl SchedulerConfig {
let max_instructions_per_install_code = NumInstructions::from(1_000 * B);
let max_instructions_per_slice = NumInstructions::from(2 * B);
let max_instructions_per_install_code_slice = NumInstructions::from(5 * B);
// Round limit is set to allow on average 2B instructions.
// See also comment about `MAX_INSTRUCTIONS_PER_ROUND`.
let max_instructions_per_round = max_instructions_per_slice
.max(max_instructions_per_install_code_slice)
+ NumInstructions::from(2 * B);
Self {
scheduler_cores: NUMBER_OF_EXECUTION_THREADS,
max_paused_executions: MAX_PAUSED_EXECUTIONS,
subnet_heap_delta_capacity: SUBNET_HEAP_DELTA_CAPACITY,
// TODO(RUN-993): Enable heap delta rate limiting for system subnets.
// Setting initial reserve to capacity effectively disables the rate limiting.
heap_delta_initial_reserve: SUBNET_HEAP_DELTA_CAPACITY,
// Round limit is set to allow on average 2B instructions.
// See also comment about `MAX_INSTRUCTIONS_PER_ROUND`.
max_instructions_per_round: max_instructions_per_slice
.max(max_instructions_per_install_code_slice)
+ NumInstructions::from(2 * B),
max_instructions_per_round,
max_instructions_per_message,
max_instructions_per_query_message,
max_instructions_per_slice,
Expand All @@ -349,6 +366,8 @@ impl SchedulerConfig {
upload_wasm_chunk_instructions: NumInstructions::from(0),
canister_snapshot_baseline_instructions: NumInstructions::from(0),
canister_snapshot_data_baseline_instructions: NumInstructions::from(0),
subnet_messages_per_round_instruction_limit: max_instructions_per_round
/ SUBNET_MESSAGES_LIMIT_FRACTION,
}
}

Expand Down
Loading
Loading