diff --git a/Cargo.lock b/Cargo.lock index 5639db3..6421cbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1829,7 +1829,7 @@ dependencies = [ [[package]] name = "p4rs" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/p4?branch=main#a1840304962af6a0d42d5c3ef064778af167d1c0" +source = "git+https://github.com/oxidecomputer/p4?branch=zl%2Fmulticast#0e20707853ef5ec7ce04e3b069053698231f7dc2" dependencies = [ "bitvec", "num", @@ -2786,6 +2786,7 @@ dependencies = [ "p4rs", "serde", "serde_json", + "thiserror 2.0.18", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 6eab14e..3ff64b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ ztest = { git = "https://github.com/oxidecomputer/falcon" } curl = "0.4.49" octocrab = "0.49" libnet = { git = "https://github.com/oxidecomputer/netadm-sys", branch = "main" } -p4rs = { git = "https://github.com/oxidecomputer/p4", branch = "main" } +p4rs = { git = "https://github.com/oxidecomputer/p4", branch = "zl/multicast" } sha256 = "1.6.0" camino = "1.2.2" thiserror = "2.0.18" diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 88ab764..3312ad1 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -7,4 +7,5 @@ edition = "2021" p4rs.workspace = true serde.workspace = true serde_json.workspace = true +thiserror.workspace = true tokio.workspace = true diff --git a/lib/src/lib.rs b/lib/src/lib.rs index ffb68ac..6cd32eb 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -12,6 +12,12 @@ use tokio::net::UnixDatagram; // Re-export p4rs so consumers can rely on matching types pub use p4rs; +/// Multicast group identifier. +pub type MulticastGroupId = u16; + +/// Physical port number within a multicast group. +pub type MulticastPort = u16; + #[derive(Debug, Default, Serialize, Deserialize)] pub enum ManagementRequest { #[default] @@ -19,12 +25,18 @@ pub enum ManagementRequest { TableAdd(TableAdd), TableRemove(TableRemove), DumpRequest, + MulticastGroupCreate(MulticastGroupCreate), + MulticastGroupRemove(MulticastGroupRemove), + MulticastPortAdd(MulticastPortAdd), + MulticastPortRemove(MulticastPortRemove), + MulticastGroupList, } #[derive(Debug, Serialize, Deserialize)] pub enum ManagementResponse { RadixResponse(u16), DumpResponse(BTreeMap>), + MulticastGroupListResponse(BTreeMap>), } #[derive(Debug, Default, Serialize, Deserialize)] @@ -41,6 +53,37 @@ pub struct TableRemove { pub keyset_data: Vec, } +#[derive(Debug, Serialize, Deserialize)] +pub struct MulticastGroupCreate { + pub group_id: MulticastGroupId, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MulticastGroupRemove { + pub group_id: MulticastGroupId, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MulticastPortAdd { + pub group_id: MulticastGroupId, + pub port: MulticastPort, + pub rid: u16, + pub level1_excl_id: u16, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MulticastPortRemove { + pub group_id: MulticastGroupId, + pub port: MulticastPort, +} + +/// Errors from multicast management operations. +#[derive(Debug, thiserror::Error)] +pub enum MulticastError { + #[error("group ID 0 is reserved (the runtime treats 0 as no-multicast)")] + ReservedGroupId, +} + pub async fn handle_management_message( msg: ManagementRequest, pipeline: &Mutex>, @@ -85,5 +128,72 @@ pub async fn handle_management_message( let buf = serde_json::to_vec(&response).unwrap(); uds.send_to(&buf, uds_dst).await.unwrap(); } + // Mutating multicast ops are fire-and-forget, matching the + // TableAdd/TableRemove pattern. Only MulticastGroupList returns + // a response. + ManagementRequest::MulticastGroupCreate(req) => { + if let Err(err) = validate_group_id(req.group_id) { + eprintln!("MulticastGroupCreate rejected: {err}"); + return; + } + let mut pl = pipeline.lock().unwrap(); + pl.add_mcast_group(req.group_id); + } + ManagementRequest::MulticastGroupRemove(req) => { + if let Err(err) = validate_group_id(req.group_id) { + eprintln!("MulticastGroupRemove rejected: {err}"); + return; + } + let mut pl = pipeline.lock().unwrap(); + pl.remove_mcast_group(req.group_id); + } + ManagementRequest::MulticastPortAdd(req) => { + if let Err(err) = validate_group_id(req.group_id) { + eprintln!("MulticastPortAdd rejected: {err}"); + return; + } + // rid and level1_excl_id are Tofino traffic manager concepts + // for per-replica identification and exclusion. SoftNPU handles + // these via McastReplicationTag in the codegen instead. + // + // Dendrite passes non-zero rid (set to external_group_id) as + // part of the Tofino replication config, meaning its accepted but + // unused. + let mut pl = pipeline.lock().unwrap(); + pl.add_mcast_port(req.group_id, req.port); + } + ManagementRequest::MulticastPortRemove(req) => { + if let Err(err) = validate_group_id(req.group_id) { + eprintln!("MulticastPortRemove rejected: {err}"); + return; + } + let mut pl = pipeline.lock().unwrap(); + pl.remove_mcast_port(req.group_id, req.port); + } + ManagementRequest::MulticastGroupList => { + let result = { + let pl = pipeline.lock().unwrap(); + pl.get_mcast_groups() + .iter() + .map(|(&group_id, ports)| { + let mut sorted: Vec = + ports.iter().copied().collect(); + sorted.sort(); + (group_id, sorted) + }) + .collect::>>() + }; + let response = + ManagementResponse::MulticastGroupListResponse(result); + let buf = serde_json::to_vec(&response).unwrap(); + uds.send_to(&buf, uds_dst).await.unwrap(); + } + } +} + +fn validate_group_id(group_id: MulticastGroupId) -> Result<(), MulticastError> { + if group_id == 0 { + return Err(MulticastError::ReservedGroupId); } + Ok(()) }