Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions src/transport/quic/runtime.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,14 @@ pub const QuicRuntime = struct {
/// matching the inline path's per-call recv bound.
const inbound_drain_per_call: usize = 1024;

/// Max bytes the BULK (block) gossip lane offers to the transport per drive
/// tick per stream. Bounds the outbound drive phase so a multi-MB block fed
/// into zquic's pending queue can't dominate a lap (was SLOW drive iter
/// outbound=700ms+, starving the priority/attestation lane). At ~256 KiB/tick
/// a ~3 MiB block ships over ~12 ticks (well within one slot) while
/// attestations drain every tick. Priority lane is unbudgeted.
const gossip_bulk_drain_budget_bytes: usize = 256 * 1024;

/// Re-drain the listener socket + flush server-side conn ACKs if at least
/// `drive_inbound_pump_interval_ms` has elapsed since the last pump. Cheap
/// when called too soon (a clock read + compare). Called from inside the
Expand Down Expand Up @@ -2617,11 +2625,27 @@ pub const QuicRuntime = struct {
lane: *std.ArrayList([]u8),
partial_flag: *bool,
coalesce_buf: []u8,
byte_budget: ?usize,
) struct { accepted_any: bool, backpressured: bool } {
const a = self.allocator;
var accepted_any: bool = false;
var backpressured: bool = false;
// Bytes offered to the transport this call. The BULK (block) lane passes a
// budget so one ~3 MiB block can't be shoved into zquic's pending queue in
// a single tick — that built a multi-MB pending backlog whose encryption
// dominated the outbound drive phase (SLOW drive iter outbound=700ms+) and
// starved the priority (attestation) lane → dropped attestations →
// coverage decay. With the budget the block is fed over several ticks and
// the priority lane drains every tick. Checked only at FRAME boundaries
// (partial_flag clear) so an in-flight frame is never interrupted.
var sent_this_call: usize = 0;
while (lane.items.len > 0) {
if (byte_budget) |bb| {
// `sent_this_call > 0` guarantees at least one frame ships even if
// bb is misconfigured to 0 (else iteration 1 would break before
// sending → guaranteed wedge).
if (sent_this_call > 0 and sent_this_call >= bb and !partial_flag.*) break;
}
// Coalesce consecutive WHOLE frames into one MTU-dense write.
// gossipsub RPC frames are self-delimiting (uvarint length prefix)
// so concatenating whole frames is wire-identical to sending them
Expand Down Expand Up @@ -2663,6 +2687,7 @@ pub const QuicRuntime = struct {
break;
}
accepted_any = true;
sent_this_call += sent;
g.last_write_ms = self.opts.now_ms_fn();

// Drop fully-sent head frames; rewrite the straddling frame (if
Expand Down Expand Up @@ -2830,7 +2855,9 @@ pub const QuicRuntime = struct {
var pri_accepted = false;
var pri_backpressured = false;
if (!g.outbox_bulk_partial) {
const pres = self.drainGossipLane(sh, g, &g.outbox, &g.outbox_partial, &coalesce_buf);
// Priority (attestations/aggregations/control): drain fully —
// these are small and must not be held back.
const pres = self.drainGossipLane(sh, g, &g.outbox, &g.outbox_partial, &coalesce_buf, null);
if (g.broken) continue;
pri_accepted = pres.accepted_any;
pri_backpressured = pres.backpressured;
Expand All @@ -2840,7 +2867,9 @@ pub const QuicRuntime = struct {
// frame must not be interrupted) AND priority did not just
// backpressure (transport is full — bulk would fail too).
if (!g.outbox_partial and !pri_backpressured and g.outbox_bulk.items.len > 0) {
const bres = self.drainGossipLane(sh, g, &g.outbox_bulk, &g.outbox_bulk_partial, &coalesce_buf);
// Bulk (blocks): budgeted per tick so a multi-MB block is fed
// over several ticks and never monopolizes the outbound phase.
const bres = self.drainGossipLane(sh, g, &g.outbox_bulk, &g.outbox_bulk_partial, &coalesce_buf, gossip_bulk_drain_budget_bytes);
if (g.broken) continue;
bulk_accepted = bres.accepted_any;
}
Expand Down
Loading