Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ztest = { git = "https://github.com/oxidecomputer/falcon" }
curl = "0.4.49"
octocrab = "0.49"
libnet = { git = "https://github.com/oxidecomputer/netadm-sys", branch = "main" }
p4rs = { git = "https://github.com/oxidecomputer/p4", branch = "main" }
p4rs = { git = "https://github.com/oxidecomputer/p4", branch = "zl/multicast" }
sha256 = "1.6.0"
camino = "1.2.2"
thiserror = "2.0.18"
Expand Down
1 change: 1 addition & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ edition = "2021"
p4rs.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
110 changes: 110 additions & 0 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,31 @@ use tokio::net::UnixDatagram;
// Re-export p4rs so consumers can rely on matching types
pub use p4rs;

/// Multicast group identifier.
pub type MulticastGroupId = u16;

/// Physical port number within a multicast group.
pub type MulticastPort = u16;

#[derive(Debug, Default, Serialize, Deserialize)]
pub enum ManagementRequest {
#[default]
RadixRequest,
TableAdd(TableAdd),
TableRemove(TableRemove),
DumpRequest,
MulticastGroupCreate(MulticastGroupCreate),
MulticastGroupRemove(MulticastGroupRemove),
MulticastPortAdd(MulticastPortAdd),
MulticastPortRemove(MulticastPortRemove),
MulticastGroupList,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum ManagementResponse {
RadixResponse(u16),
DumpResponse(BTreeMap<String, Vec<TableEntry>>),
MulticastGroupListResponse(BTreeMap<MulticastGroupId, Vec<MulticastPort>>),
}

#[derive(Debug, Default, Serialize, Deserialize)]
Expand All @@ -41,6 +53,37 @@ pub struct TableRemove {
pub keyset_data: Vec<u8>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MulticastGroupCreate {
pub group_id: MulticastGroupId,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MulticastGroupRemove {
pub group_id: MulticastGroupId,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MulticastPortAdd {
pub group_id: MulticastGroupId,
pub port: MulticastPort,
pub rid: u16,
pub level1_excl_id: u16,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MulticastPortRemove {
pub group_id: MulticastGroupId,
pub port: MulticastPort,
}

/// Errors from multicast management operations.
#[derive(Debug, thiserror::Error)]
pub enum MulticastError {
#[error("group ID 0 is reserved (the runtime treats 0 as no-multicast)")]
ReservedGroupId,
}

pub async fn handle_management_message(
msg: ManagementRequest,
pipeline: &Mutex<Box<dyn Pipeline>>,
Expand Down Expand Up @@ -85,5 +128,72 @@ pub async fn handle_management_message(
let buf = serde_json::to_vec(&response).unwrap();
uds.send_to(&buf, uds_dst).await.unwrap();
}
// Mutating multicast ops are fire-and-forget, matching the
// TableAdd/TableRemove pattern. Only MulticastGroupList returns
// a response.
ManagementRequest::MulticastGroupCreate(req) => {
if let Err(err) = validate_group_id(req.group_id) {
eprintln!("MulticastGroupCreate rejected: {err}");
return;
}
let mut pl = pipeline.lock().unwrap();
pl.add_mcast_group(req.group_id);
}
ManagementRequest::MulticastGroupRemove(req) => {
if let Err(err) = validate_group_id(req.group_id) {
eprintln!("MulticastGroupRemove rejected: {err}");
return;
}
let mut pl = pipeline.lock().unwrap();
pl.remove_mcast_group(req.group_id);
}
ManagementRequest::MulticastPortAdd(req) => {
if let Err(err) = validate_group_id(req.group_id) {
eprintln!("MulticastPortAdd rejected: {err}");
return;
}
// rid and level1_excl_id are Tofino traffic manager concepts
// for per-replica identification and exclusion. SoftNPU handles
// these via McastReplicationTag in the codegen instead.
//
// Dendrite passes non-zero rid (set to external_group_id) as
// part of the Tofino replication config, meaning its accepted but
// unused.
let mut pl = pipeline.lock().unwrap();
pl.add_mcast_port(req.group_id, req.port);
}
ManagementRequest::MulticastPortRemove(req) => {
if let Err(err) = validate_group_id(req.group_id) {
eprintln!("MulticastPortRemove rejected: {err}");
return;
}
let mut pl = pipeline.lock().unwrap();
pl.remove_mcast_port(req.group_id, req.port);
}
ManagementRequest::MulticastGroupList => {
let result = {
let pl = pipeline.lock().unwrap();
pl.get_mcast_groups()
.iter()
.map(|(&group_id, ports)| {
let mut sorted: Vec<MulticastPort> =
ports.iter().copied().collect();
sorted.sort();
(group_id, sorted)
})
.collect::<BTreeMap<MulticastGroupId, Vec<MulticastPort>>>()
};
let response =
ManagementResponse::MulticastGroupListResponse(result);
let buf = serde_json::to_vec(&response).unwrap();
uds.send_to(&buf, uds_dst).await.unwrap();
}
}
}

fn validate_group_id(group_id: MulticastGroupId) -> Result<(), MulticastError> {
if group_id == 0 {
return Err(MulticastError::ReservedGroupId);
}
Ok(())
}