Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions vm/devices/virtio/virtio/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,19 +325,27 @@ impl VirtioQueue {
}
}

/// Polls until the queue is kicked by the guest, indicating new work may be available.
/// Polls until the queue is kicked by the guest, indicating new work may be
/// available.
///
/// Before sleeping, this arms kick notification and rechecks the queue. If
/// new data arrived during arming, it returns immediately without sleeping.
/// On wakeup, kicks are suppressed to avoid unnecessary doorbells while
/// the caller drains the queue.
pub fn poll_kick(&mut self, cx: &mut Context<'_>) -> Poll<()> {
ready!(self.queue_event.wait().poll_unpin(cx)).expect("waits on Event cannot fail");
if self.core.arm_for_kick() {
ready!(self.queue_event.wait().poll_unpin(cx)).expect("waits on Event cannot fail");
}
Poll::Ready(())
}

/// Try to get the next work item from the queue. Returns `Ok(None)` if no
/// work is currently available, or an error if there was an issue accessing
/// the queue.
///
/// If `None` is returned, then the queue will be armed so that the guest
/// will kick it when new work is available; the caller can use
/// [`poll_kick`](Self::poll_kick) to wait for this.
/// This is a lightweight check that does not arm kick notification. When
/// used in a poll loop with [`poll_kick`](Self::poll_kick), the kick will
/// be armed automatically before sleeping.
pub fn try_next(&mut self) -> Result<Option<VirtioQueueCallbackWork>, Error> {
Ok(self
.core
Expand Down
56 changes: 56 additions & 0 deletions vm/devices/virtio/virtio/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ pub(crate) struct QueueCoreGetWork {
mem: GuestMemory,
#[inspect(flatten)]
inner: QueueGetWorkInner,
/// Whether kick notification is currently armed.
armed: bool,
}

impl QueueCoreGetWork {
Expand Down Expand Up @@ -178,6 +180,7 @@ impl QueueCoreGetWork {
features,
mem,
inner,
armed: false,
})
}

Expand All @@ -202,9 +205,62 @@ impl QueueCoreGetWork {
QueueGetWorkInner::Packed(packed) => packed.is_available()?,
};
let Some(index) = index else { return Ok(None) };
self.suppress_if_armed();
self.work_from_index(index).map(Some)
}

/// Arms kick notification so the guest will send a doorbell when new work
/// is available. Returns `true` if armed successfully (caller should
/// sleep), or `false` if new data arrived during arming (caller should
/// retry by calling [`try_next_work`](Self::try_next_work) again).
///
/// If already armed, this is a no-op and returns `true`.
pub fn arm_for_kick(&mut self) -> bool {
if self.armed {
return true;
}
let r = match &mut self.inner {
QueueGetWorkInner::Split(split) => split.arm_kick(),
QueueGetWorkInner::Packed(packed) => packed.arm_kick(),
};
match r {
Ok(true) => {
self.armed = true;
true
}
Ok(false) => false,
Err(err) => {
tracelimit::error_ratelimited!(
error = &err as &dyn std::error::Error,
"failed to arm kick"
);
// On error, behave as if armed to avoid a busy loop in callers
// that treat `false` as "retry immediately".
self.armed = true;
true
}
}
}

/// If kicks are armed, suppress them. Called automatically when work is
/// found so the guest doesn't send unnecessary doorbells while draining.
fn suppress_if_armed(&mut self) {
if self.armed {
self.armed = false;
let r = match &self.inner {
QueueGetWorkInner::Split(split) => split.suppress_kicks(),
QueueGetWorkInner::Packed(packed) => packed.suppress_kicks(),
};

if let Err(err) = r {
tracelimit::error_ratelimited!(
error = &err as &dyn std::error::Error,
"failed to suppress kicks"
);
}
}
}

/// Advances the available index after a successful
/// [`try_peek_work`](Self::try_peek_work) call.
pub fn advance(&mut self, work: &QueueWork) {
Expand Down
78 changes: 55 additions & 23 deletions vm/devices/virtio/virtio/src/queue/packed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::queue::QueueDescriptor;
use crate::queue::QueueError;
use crate::queue::QueueParams;
use crate::queue::descriptor_offset;
use crate::queue::read_descriptor;
use crate::spec::VirtioDeviceFeatures;
use crate::spec::queue as spec;
use crate::spec::queue::DescriptorFlags;
Expand Down Expand Up @@ -47,6 +46,7 @@ pub(crate) struct PackedQueueGetWork {
queue_size: u16,
next_avail_index: u16,
wrapped_bit: bool,
next_is_available: bool,
}

impl PackedQueueGetWork {
Expand All @@ -73,6 +73,7 @@ impl PackedQueueGetWork {
queue_size: params.size,
next_avail_index: initial_index,
wrapped_bit: initial_wrap,
next_is_available: false,
})
}

Expand All @@ -81,33 +82,60 @@ impl PackedQueueGetWork {
self.next_avail_index | (u16::from(self.wrapped_bit) << 15)
}

pub fn is_available(&self) -> Result<Option<u16>, QueueError> {
loop {
let disable_event =
PackedEventSuppression::new().with_flags(EventSuppressionFlags::Disabled);
self.device_event
.write_plain(0, &disable_event)
/// Checks whether a descriptor is available, returning its index.
///
/// This is a lightweight check that does not arm kick notification. When
/// `None` is returned, the caller must call [`arm_kick`](Self::arm_kick)
/// before sleeping to ensure the guest will send a kick when new work
/// arrives.
pub fn is_available(&mut self) -> Result<Option<u16>, QueueError> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there value in returning something other than an option to make this behavior more explicit? This sounds like a bug waiting to happen, IE caller forgot to call arm_kick.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is the intention that most callers should not call this directly, and use the poll wrappers in common.rs?

if !self.next_is_available {
let flags: DescriptorFlags = self
.queue_desc
.read_plain(
descriptor_offset(self.next_avail_index)
+ std::mem::offset_of!(PackedDescriptor, flags_raw) as u64,
)
.map_err(QueueError::Memory)?;
atomic::fence(atomic::Ordering::Acquire);
let descriptor: PackedDescriptor =
read_descriptor(&self.queue_desc, self.next_avail_index)?;
let flags = descriptor.flags();
if flags.available() == self.wrapped_bit && flags.used() != self.wrapped_bit {
return Ok(Some(self.next_avail_index));
}
let enable_event =
PackedEventSuppression::new().with_flags(EventSuppressionFlags::Enabled);
self.device_event
.write_plain(0, &enable_event)
.map_err(QueueError::Memory)?;
atomic::fence(atomic::Ordering::SeqCst);
let descriptor: PackedDescriptor =
read_descriptor(&self.queue_desc, self.next_avail_index)?;
let flags = descriptor.flags();
if flags.available() != self.wrapped_bit || flags.used() == self.wrapped_bit {
return Ok(None);
}
// Ensure subsequent descriptor-field reads cannot be reordered
// before the flags read on weakly ordered architectures.
atomic::fence(atomic::Ordering::Acquire);
self.next_is_available = true;
}
Ok(Some(self.next_avail_index))
}

/// Arms kick notification so the guest will send a doorbell when new work
/// is available. Returns `true` if armed successfully (caller should
/// sleep), or `false` if new data arrived during arming (caller should
/// retry).
pub fn arm_kick(&mut self) -> Result<bool, QueueError> {
let enable_event = PackedEventSuppression::new().with_flags(EventSuppressionFlags::Enabled);
self.device_event
.write_plain(0, &enable_event)
.map_err(QueueError::Memory)?;
// Ensure the event enable is visible before checking the descriptor.
atomic::fence(atomic::Ordering::SeqCst);
if self.is_available()?.is_some() {
// New data arrived during arming — suppress kicks and report.
self.suppress_kicks()?;
return Ok(false);
}
Ok(true)
}

/// Suppress kick notifications from the guest. Call this after finding
/// work to avoid unnecessary kicks while processing.
pub fn suppress_kicks(&self) -> Result<(), QueueError> {
let disable_event =
PackedEventSuppression::new().with_flags(EventSuppressionFlags::Disabled);
self.device_event
.write_plain(0, &disable_event)
.map_err(QueueError::Memory)?;
Ok(())
}

/// Advances `next_avail_index` by `count` descriptors.
Expand All @@ -117,6 +145,7 @@ impl PackedQueueGetWork {
self.wrapped_bit = !self.wrapped_bit;
}
self.next_avail_index = next_avail_index;
self.next_is_available = false;
}
}

Expand Down Expand Up @@ -176,6 +205,9 @@ impl PackedQueueCompleteWork {
.with_available(self.wrapped_bit)
.with_used(self.wrapped_bit),
);
// Ensure any prior writes to guest buffers (e.g. device data) are
// visible before the used descriptor becomes visible to the guest.
atomic::fence(atomic::Ordering::Release);
self.queue_desc
.write_plain(descriptor_offset(self.next_index), &descriptor)
.map_err(QueueError::Memory)?;
Expand Down
69 changes: 52 additions & 17 deletions vm/devices/virtio/virtio/src/queue/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub(crate) struct SplitQueueGetWork {
queue_used: GuestMemory,
queue_size: u16,
last_avail_index: u16,
/// Cached guest avail_index from the last read, to avoid re-reading for
/// each descriptor when draining a batch.
cached_avail_index: u16,
use_ring_event_index: bool,
}

Expand Down Expand Up @@ -57,6 +60,7 @@ impl SplitQueueGetWork {
queue_used,
queue_size: params.size,
last_avail_index: initial_avail_index,
cached_avail_index: initial_avail_index,
use_ring_event_index: features.bank0().ring_event_idx(),
})
}
Expand All @@ -82,32 +86,63 @@ impl SplitQueueGetWork {
/// Checks whether a descriptor is available, returning its wrapped index.
/// Does not advance `last_avail_index`; call [`advance`](Self::advance)
/// to consume the descriptor.
///
/// This is a lightweight check that does not arm kick notification. When
/// `None` is returned, the caller must call [`arm_kick`](Self::arm_kick)
/// before sleeping to ensure the guest will send a kick when new work
/// arrives.
///
/// Reads the guest's avail_index only when the locally cached value has
/// been exhausted, allowing multiple descriptors to be drained per read.
pub fn is_available(&mut self) -> Result<Option<u16>, QueueError> {
let mut avail_index = Self::get_available_index(self)?;
if avail_index == self.last_avail_index {
if self.use_ring_event_index {
self.set_available_event(avail_index)?;
} else {
self.set_used_flags(spec::UsedFlags::new())?;
}
// Ensure the available event/used flags are visible before checking
// the available index again.
atomic::fence(atomic::Ordering::SeqCst);
avail_index = Self::get_available_index(self)?;
if avail_index == self.last_avail_index {
if self.cached_avail_index == self.last_avail_index {
// Re-read the guest's avail_index to see if new work arrived.
self.cached_avail_index = self.get_available_index()?;
if self.cached_avail_index == self.last_avail_index {
return Ok(None);
}
// Ensure available index read is ordered before subsequent
// descriptor reads. Only needed when we actually read from guest
// memory; the cached path reuses a previously-fenced value.
atomic::fence(atomic::Ordering::Acquire);
}
Ok(Some(self.last_avail_index & (self.queue_size - 1)))
}

/// Arms kick notification so the guest will send a doorbell when new work
/// is available. Returns `true` if armed successfully (caller should
/// sleep), or `false` if new data arrived during arming (caller should
/// retry).
pub fn arm_kick(&mut self) -> Result<bool, QueueError> {
if self.use_ring_event_index {
self.set_available_event(self.last_avail_index)?;
} else {
self.set_used_flags(spec::UsedFlags::new())?;
}
// Ensure the available event/used flags are visible before checking
// the available index again.
atomic::fence(atomic::Ordering::SeqCst);
if self.is_available()?.is_some() {
// New work arrived during arming — suppress kicks again.
self.suppress_kicks()?;
return Ok(false);
}
Ok(true)
}

/// Suppress kick notifications from the guest. Call this after finding
/// work to avoid unnecessary kicks while processing.
///
/// With `EVENT_IDX`, the `avail_event` value set during arming is
/// inherently index-based and will go stale as we drain descriptors.
/// There's no good way to express "don't kick" — any fixed index will
/// eventually be hit on wrap. So we skip the write; the worst case is
/// one spurious kick per u16 wrap (every 65536 descriptors).
pub fn suppress_kicks(&self) -> Result<(), QueueError> {
if !self.use_ring_event_index {
self.set_used_flags(spec::UsedFlags::new().with_no_notify(true))?;
}
// Ensure available index read is ordered before subsequent descriptor
// reads.
atomic::fence(atomic::Ordering::Acquire);
Ok(Some(self.last_avail_index % self.queue_size))
Ok(())
}

/// Advances `last_avail_index` by one, consuming the descriptor returned
Expand Down Expand Up @@ -230,7 +265,7 @@ impl SplitQueueCompleteWork {
descriptor_index: u16,
bytes_written: u32,
) -> Result<(), QueueError> {
let wrapped_index = (queue_last_used_index % self.queue_size) as u64;
let wrapped_index = (queue_last_used_index & (self.queue_size - 1)) as u64;
let addr = spec::USED_OFFSET_RING + spec::USED_ELEMENT_SIZE * wrapped_index;
self.queue_used
.write_plain(
Expand Down
14 changes: 14 additions & 0 deletions vm/devices/virtio/virtio_spec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,26 @@ pub mod queue {
}
}

/// Flags controlling event (interrupt/notification) suppression for packed
/// virtqueues.
///
/// Reference: virtio spec §2.8.10, "Event Suppression Structure Layout".
#[derive(Debug, PartialEq, Eq)]
#[repr(u8)]
pub enum EventSuppressionFlags {
/// `RING_EVENT_FLAGS_ENABLE` (0x0) — events are enabled; the device/driver
/// should generate events (interrupts or notifications) normally.
Enabled = 0,
/// `RING_EVENT_FLAGS_DISABLE` (0x1) — events are disabled; the
/// device/driver should not generate any events.
Disabled = 1,
/// `RING_EVENT_FLAGS_DESC` (0x2) — enable events only when a specific
/// descriptor index (with matching wrap counter) is reached, as
/// specified by the `offset` and `wrap` fields of the
/// [`PackedEventSuppression`] structure.
DescriptorIndex = 2,
/// Reserved value (0x3). Treated as "events enabled" by this
/// implementation for forward compatibility.
Reserved = 3,
}
impl EventSuppressionFlags {
Expand Down
Loading