diff --git a/src/transport/quic/runtime.zig b/src/transport/quic/runtime.zig index 07ac85a..c8289e9 100644 --- a/src/transport/quic/runtime.zig +++ b/src/transport/quic/runtime.zig @@ -1593,6 +1593,14 @@ pub const QuicRuntime = struct { /// matching the inline path's per-call recv bound. const inbound_drain_per_call: usize = 1024; + /// Max bytes the BULK (block) gossip lane offers to the transport per drive + /// tick per stream. Bounds the outbound drive phase so a multi-MB block fed + /// into zquic's pending queue can't dominate a lap (was SLOW drive iter + /// outbound=700ms+, starving the priority/attestation lane). At ~256 KiB/tick + /// a ~3 MiB block ships over ~12 ticks (well within one slot) while + /// attestations drain every tick. Priority lane is unbudgeted. + const gossip_bulk_drain_budget_bytes: usize = 256 * 1024; + /// Re-drain the listener socket + flush server-side conn ACKs if at least /// `drive_inbound_pump_interval_ms` has elapsed since the last pump. Cheap /// when called too soon (a clock read + compare). Called from inside the @@ -2617,11 +2625,27 @@ pub const QuicRuntime = struct { lane: *std.ArrayList([]u8), partial_flag: *bool, coalesce_buf: []u8, + byte_budget: ?usize, ) struct { accepted_any: bool, backpressured: bool } { const a = self.allocator; var accepted_any: bool = false; var backpressured: bool = false; + // Bytes offered to the transport this call. The BULK (block) lane passes a + // budget so one ~3 MiB block can't be shoved into zquic's pending queue in + // a single tick — that built a multi-MB pending backlog whose encryption + // dominated the outbound drive phase (SLOW drive iter outbound=700ms+) and + // starved the priority (attestation) lane → dropped attestations → + // coverage decay. With the budget the block is fed over several ticks and + // the priority lane drains every tick. Checked only at FRAME boundaries + // (partial_flag clear) so an in-flight frame is never interrupted. + var sent_this_call: usize = 0; while (lane.items.len > 0) { + if (byte_budget) |bb| { + // `sent_this_call > 0` guarantees at least one frame ships even if + // bb is misconfigured to 0 (else iteration 1 would break before + // sending → guaranteed wedge). + if (sent_this_call > 0 and sent_this_call >= bb and !partial_flag.*) break; + } // Coalesce consecutive WHOLE frames into one MTU-dense write. // gossipsub RPC frames are self-delimiting (uvarint length prefix) // so concatenating whole frames is wire-identical to sending them @@ -2663,6 +2687,7 @@ pub const QuicRuntime = struct { break; } accepted_any = true; + sent_this_call += sent; g.last_write_ms = self.opts.now_ms_fn(); // Drop fully-sent head frames; rewrite the straddling frame (if @@ -2830,7 +2855,9 @@ pub const QuicRuntime = struct { var pri_accepted = false; var pri_backpressured = false; if (!g.outbox_bulk_partial) { - const pres = self.drainGossipLane(sh, g, &g.outbox, &g.outbox_partial, &coalesce_buf); + // Priority (attestations/aggregations/control): drain fully — + // these are small and must not be held back. + const pres = self.drainGossipLane(sh, g, &g.outbox, &g.outbox_partial, &coalesce_buf, null); if (g.broken) continue; pri_accepted = pres.accepted_any; pri_backpressured = pres.backpressured; @@ -2840,7 +2867,9 @@ pub const QuicRuntime = struct { // frame must not be interrupted) AND priority did not just // backpressure (transport is full — bulk would fail too). if (!g.outbox_partial and !pri_backpressured and g.outbox_bulk.items.len > 0) { - const bres = self.drainGossipLane(sh, g, &g.outbox_bulk, &g.outbox_bulk_partial, &coalesce_buf); + // Bulk (blocks): budgeted per tick so a multi-MB block is fed + // over several ticks and never monopolizes the outbound phase. + const bres = self.drainGossipLane(sh, g, &g.outbox_bulk, &g.outbox_bulk_partial, &coalesce_buf, gossip_bulk_drain_budget_bytes); if (g.broken) continue; bulk_accepted = bres.accepted_any; }