Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 179 additions & 97 deletions rs/https_outcalls/consensus/src/payload_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use crate::{
payload_builder::{
parse::bytes_to_payload,
utils::{
estimate_response_with_consensus_size, find_flexible_responses,
FlexibleFindResult, estimate_response_with_consensus_size, find_flexible_result,
find_fully_replicated_response, find_non_replicated_response,
group_shares_by_callback_id, grouped_shares_meet_divergence_criteria,
validate_flexible_response_with_proof, validate_response_share,
},
},
};
Expand Down Expand Up @@ -38,7 +39,8 @@ use ic_replicated_state::ReplicatedState;
use ic_types::{
CountBytes, Height, NodeId, NumBytes, RegistryVersion, SubnetId,
batch::{
CanisterHttpPayload, ConsensusResponse, FlexibleCanisterHttpResponses,
CanisterHttpPayload, ConsensusResponse, FlexibleCanisterHttpError,
FlexibleCanisterHttpResponseWithProof, FlexibleCanisterHttpResponses,
MAX_CANISTER_HTTP_PAYLOAD_SIZE, ValidationContext,
},
canister_http::{
Expand All @@ -47,7 +49,7 @@ use ic_types::{
CanisterHttpResponseMetadata, CanisterHttpResponseWithConsensus, Replication,
},
consensus::Committee,
crypto::{Signed, crypto_hash},
crypto::Signed,
messages::{CallbackId, Payload, RejectContext},
registry::RegistryClientError,
signature::BasicSignature,
Expand Down Expand Up @@ -217,6 +219,7 @@ impl CanisterHttpPayloadBuilderImpl {
let mut timeouts = vec![];
let mut divergence_responses = vec![];
let mut flexible_responses = vec![];
let mut flexible_errors = vec![];

// Metrics counters
let mut total_share_count = 0;
Expand Down Expand Up @@ -257,14 +260,26 @@ impl CanisterHttpPayloadBuilderImpl {
continue;
}
if request.time + CANISTER_HTTP_TIMEOUT_INTERVAL < validation_context.time {
let candidate_size = callback_id.count_bytes();
let size = NumBytes::new((accumulated_size + candidate_size) as u64);
if size < max_payload_size {
timeouts.push(*callback_id);
accumulated_size += candidate_size;
// Because timeouts are very cheap to verify, they are
// not counted as responses (so that they are irrelevant
// for the CANISTER_HTTP_MAX_RESPONSES_PER_BLOCK limit.
// Because timeouts are very cheap to verify, they are
// not counted as responses (so that they are irrelevant
// for the CANISTER_HTTP_MAX_RESPONSES_PER_BLOCK limit.
if matches!(request.replication, Replication::Flexible { .. }) {
let error = FlexibleCanisterHttpError::Timeout {
callback_id: *callback_id,
};
let candidate_size = error.count_bytes();
let size = NumBytes::new((accumulated_size + candidate_size) as u64);
if size < max_payload_size {
flexible_errors.push(error);
accumulated_size += candidate_size;
}
} else {
let candidate_size = callback_id.count_bytes();
let size = NumBytes::new((accumulated_size + candidate_size) as u64);
if size < max_payload_size {
timeouts.push(*callback_id);
accumulated_size += candidate_size;
}
}
continue;
}
Expand Down Expand Up @@ -329,22 +344,33 @@ impl CanisterHttpPayloadBuilderImpl {
committee,
min_responses,
max_responses,
} => {
if let Some((group, group_size)) = find_flexible_responses(
*callback_id,
grouped_shares,
committee,
*min_responses,
*max_responses,
accumulated_size,
max_payload_size,
&*pool_access,
) {
} => match find_flexible_result(
*callback_id,
grouped_shares,
committee,
*min_responses,
*max_responses,
accumulated_size,
max_payload_size,
&*pool_access,
) {
FlexibleFindResult::OkResponses(group, group_size) => {
// Note: budget tracking w.r.t. `max_payload_size`
// is done directly in `find_flexible_result`.
flexible_responses.push(group);
responses_included += 1;
accumulated_size += group_size;
}
}
FlexibleFindResult::Error(error, error_size) => {
let size = NumBytes::new((accumulated_size + error_size) as u64);
if size < max_payload_size {
flexible_errors.push(error);
responses_included += 1;
accumulated_size += error_size;
}
}
FlexibleFindResult::Pending => {}
},
}
}
}
Expand All @@ -359,7 +385,7 @@ impl CanisterHttpPayloadBuilderImpl {
timeouts,
divergence_responses,
flexible_responses,
flexible_errors: vec![],
flexible_errors,
}
}

Expand Down Expand Up @@ -642,28 +668,19 @@ impl CanisterHttpPayloadBuilderImpl {

let mut seen_signers = HashSet::new();

for entry in &group.responses {
// Callback id consistency
if entry.response.id != callback_id {
return invalid_artifact(
InvalidCanisterHttpPayloadReason::FlexibleCallbackIdMismatch {
callback_id,
mismatched_id: entry.response.id,
},
);
}
if entry.proof.content.id != callback_id {
return invalid_artifact(
InvalidCanisterHttpPayloadReason::FlexibleCallbackIdMismatch {
callback_id,
mismatched_id: entry.proof.content.id,
},
);
}
for response_with_proof in &group.responses {
validate_flexible_response_with_proof(
response_with_proof,
callback_id,
flex_committee,
&mut seen_signers,
consensus_registry_version,
&*self.crypto,
)
.map_err(CanisterHttpPayloadValidationError::InvalidArtifact)?;

// Rejects are not allowed in flexible ok-responses
if matches!(
entry.response.content,
response_with_proof.response.content,
CanisterHttpResponseContent::Reject(_)
) {
return invalid_artifact(
Expand All @@ -672,68 +689,133 @@ impl CanisterHttpPayloadBuilderImpl {
},
);
}
}
}

// No duplicate signers
let signer = entry.proof.signature.signer;
if !seen_signers.insert(signer) {
return invalid_artifact(
InvalidCanisterHttpPayloadReason::FlexibleDuplicateSigner {
callback_id,
signer,
},
);
}
// Validate flexible errors
for error in &payload.flexible_errors {
let callback_id = error.callback_id();

// Signer must be in the flexible committee
if !flex_committee.contains(&signer) {
return invalid_artifact(
InvalidCanisterHttpPayloadReason::FlexibleSignerNotInCommittee {
if !delivered_ids.insert(callback_id) {
return invalid_artifact(InvalidCanisterHttpPayloadReason::DuplicateResponse(
callback_id,
));
}

// Look up the request context and verify it's a Flexible replication
let context = http_contexts.get(&callback_id).ok_or(
CanisterHttpPayloadValidationError::InvalidArtifact(
InvalidCanisterHttpPayloadReason::UnknownCallbackId(callback_id),
),
)?;
let Replication::Flexible {
committee: flex_committee,
min_responses,
..
} = &context.replication
else {
return invalid_artifact(InvalidCanisterHttpPayloadReason::InvalidPayloadSection(
callback_id,
));
};

match error {
FlexibleCanisterHttpError::Timeout { .. } => {
if context.time + CANISTER_HTTP_TIMEOUT_INTERVAL >= validation_context.time {
return invalid_artifact(InvalidCanisterHttpPayloadReason::NotTimedOut(
callback_id,
signer,
},
);
));
}
}
FlexibleCanisterHttpError::TooManyRequestErrors {
reject_responses, ..
} => {
let mut seen_signers = HashSet::new();

for response_with_proof in reject_responses {
validate_flexible_response_with_proof(
response_with_proof,
callback_id,
flex_committee,
&mut seen_signers,
consensus_registry_version,
&*self.crypto,
)
.map_err(CanisterHttpPayloadValidationError::InvalidArtifact)?;

// Content hash must match
let calculated_hash = crypto_hash(&entry.response);
if calculated_hash != entry.proof.content.content_hash {
return invalid_artifact(
InvalidCanisterHttpPayloadReason::ContentHashMismatch {
metadata_hash: entry.proof.content.content_hash.clone(),
calculated_hash,
},
);
}
if !matches!(
response_with_proof.response.content,
CanisterHttpResponseContent::Reject(_)
) {
return invalid_artifact(
InvalidCanisterHttpPayloadReason::FlexibleRejectExpectedInErrorResponse(
callback_id,
),
);
}
}

// Content size must match
let calculated_size = entry.response.content.count_bytes() as u32;
if calculated_size != entry.proof.content.content_size {
return invalid_artifact(
InvalidCanisterHttpPayloadReason::ContentSizeMismatch {
metadata_size: entry.proof.content.content_size,
calculated_size,
},
);
let max_allowed_rejects =
flex_committee.len().saturating_sub(*min_responses as usize);
if reject_responses.len() <= max_allowed_rejects {
return invalid_artifact(
InvalidCanisterHttpPayloadReason::FlexibleInsufficientRejectCount {
callback_id,
reject_count: reject_responses.len(),
min_needed: max_allowed_rejects + 1,
},
);
}
}
FlexibleCanisterHttpError::ResponsesTooLarge {
metadata_shares, ..
} => {
let mut seen_signers = HashSet::new();

for share in metadata_shares {
validate_response_share(
share,
callback_id,
flex_committee,
&mut seen_signers,
consensus_registry_version,
&*self.crypto,
)
.map_err(CanisterHttpPayloadValidationError::InvalidArtifact)?;
}

// Registry version must match
if entry.proof.content.registry_version != consensus_registry_version {
return invalid_artifact(
InvalidCanisterHttpPayloadReason::RegistryVersionMismatch {
expected: consensus_registry_version,
received: entry.proof.content.registry_version,
},
);
if metadata_shares.len() < *min_responses as usize {
return invalid_artifact(
InvalidCanisterHttpPayloadReason::FlexibleInsufficientMetadataShareCount {
callback_id,
share_count: metadata_shares.len(),
min_needed: *min_responses as usize,
},
);
Comment thread
fspreiss marked this conversation as resolved.
}
// Verify the smallest `min_responses` response-with-proof sizes
// actually exceed MAX_CANISTER_HTTP_PAYLOAD_SIZE.
let canister_id = &context.request.sender;
let mut entry_sizes: Vec<usize> = metadata_shares
.iter()
.map(|share| {
FlexibleCanisterHttpResponseWithProof::count_bytes_from_parts(
canister_id,
share.content.content_size as usize,
share,
)
})
.collect();
entry_sizes.sort_unstable();
Comment thread
eichhorl marked this conversation as resolved.
let smallest_sum: usize = entry_sizes[..*min_responses as usize].iter().sum();
if smallest_sum <= MAX_CANISTER_HTTP_PAYLOAD_SIZE {
return invalid_artifact(
InvalidCanisterHttpPayloadReason::FlexibleResponsesNotTooLarge(
callback_id,
),
);
}
}

// Verify the individual share signature
self.crypto
.verify(&entry.proof, consensus_registry_version)
.map_err(|err| {
CanisterHttpPayloadValidationError::InvalidArtifact(
InvalidCanisterHttpPayloadReason::SignatureError(Box::new(err)),
)
})?;
}
}

Expand Down
Loading
Loading