diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index c5350dd91ad17..f638619883f62 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -34,7 +34,7 @@ bevy_reflect = ["dep:bevy_reflect"] reflect_functions = ["bevy_reflect", "bevy_reflect/functions"] ## Use the configurable global error handler as the default error handler. -## +## ## This is typically used to turn panics from the ECS into loggable errors. ## This may be useful for production builds, ## but can result in a measurable performance impact, especially for commands. @@ -85,6 +85,7 @@ std = [ "arrayvec?/std", "log/std", "bevy_platform_support/std", + "async-channel/std", ] ## `critical-section` provides the building blocks for synchronization primitives @@ -132,12 +133,18 @@ variadics_please = { version = "1.1", default-features = false } tracing = { version = "0.1", default-features = false, optional = true } log = { version = "0.4", default-features = false } bumpalo = "3" +async-channel = { version = "2.3", default-features = false } [target.'cfg(not(all(target_has_atomic = "8", target_has_atomic = "16", target_has_atomic = "32", target_has_atomic = "64", target_has_atomic = "ptr")))'.dependencies] concurrent-queue = { version = "2.5.0", default-features = false, features = [ "portable-atomic", ] } +# This is an unused dependency, but it adds the feature to a dependency of async-channel for no_std support +event-listener = { version = "5", default-features = false, features = [ + "portable-atomic", +] } + [dev-dependencies] rand = "0.8" static_assertions = "1.1.0" diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 2756648e94218..c59e5783dfef2 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -73,7 +73,10 @@ use crate::{ storage::{SparseSetIndex, TableId, TableRow}, }; use alloc::vec::Vec; -use bevy_platform_support::sync::atomic::Ordering; +use bevy_platform_support::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, +}; use core::{fmt, hash::Hash, mem, num::NonZero, panic::Location}; use log::warn; @@ -515,6 +518,91 @@ impl<'a> core::iter::FusedIterator for ReserveEntitiesIterator<'a> {} // SAFETY: Newly reserved entity values are unique. unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {} +/// This is the shared data behind [`RemoteEntities`]. +#[derive(Debug)] +struct RemoteEntitiesInner { + /// The number of requests that have been made since the last [`force_remote_fulfill`](Entities::force_remote_fulfill). + recent_requests: AtomicU32, + /// The number of entities we're trying to keep in the channel for low latency access. + in_channel: AtomicU32, + + // Channels for sending and receiving reserved entities. + reserved: async_channel::Receiver, + reserver: async_channel::Sender, +} + +/// An error that occurs when an [`Entity`] can not be reserved remotely. +/// See also [`RemoteEntities`]. +#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)] +pub enum RemoteReservationError { + /// This happens when [`Entities`] are closed, dropped, etc while a [`RemoteEntities`] is trying to reserve from it. + #[error("A remote entity reserver tried to reserve an entity from a closed `Entities`.")] + Closed, +} + +impl RemoteEntitiesInner { + fn new() -> Self { + let (sender, receiver) = async_channel::unbounded(); + Self { + recent_requests: AtomicU32::new(0), + reserver: sender, + reserved: receiver, + in_channel: AtomicU32::new(0), + } + } + + fn close(&self) { + self.reserved.close(); + } +} + +/// Manages access to [`Entities`] from any thread and async. +#[derive(Clone)] +pub struct RemoteEntities { + inner: Arc, +} + +impl RemoteEntities { + /// Reserves an [`Entity`] from async. + /// + /// # Example + /// + /// ``` + /// use bevy_ecs::prelude::*; + /// + /// let mut world = World::new(); + /// let remote = world.entities().get_remote(); + /// + /// // The reserve is async so we need it to be on a separate thread. + /// let thread = std::thread::spawn(move || { + /// let future = async { + /// for _ in 0..100 { + /// remote.reserve_entity().await.unwrap(); + /// } + /// }; + /// bevy_tasks::block_on(future); + /// }); + /// + /// // We need to flush the entities as needed or the remote entities will get stuck. + /// while !thread.is_finished() { + /// world.flush(); + /// } + /// ``` + pub async fn reserve_entity(&self) -> Result { + self.inner.recent_requests.fetch_add(1, Ordering::Relaxed); + self.inner + .reserved + .recv() + .await + .map_err(|_| RemoteReservationError::Closed) + } + + /// Returns true only if the [`Entities`] has discontinued this remote access. + pub fn is_closed(&self) -> bool { + self.inner.reserved.is_closed() + } +} + /// A [`World`]'s internal metadata store on all of its entities. /// /// Contains metadata on: @@ -525,6 +613,10 @@ unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {} /// [`World`]: crate::world::World #[derive(Debug)] pub struct Entities { + /// This is the number of entities we keep ready for remote reservations via [`RemoteEntities::reserve_entity`]. + /// A value too high can cause excess memory to be used, but a value too low can cause additional waiting. + pub entities_hot_for_remote: u32, + remote: Arc, meta: Vec, /// The `pending` and `free_cursor` fields describe three sets of Entity IDs @@ -572,11 +664,23 @@ pub struct Entities { } impl Entities { - pub(crate) const fn new() -> Self { + /// The default value of [`entities_hot_for_remote`](Self::entities_hot_for_remote). + pub const DEFAULT_HOT_REMOTE_ENTITIES: u32 = 256; + + pub(crate) fn new() -> Self { Entities { meta: Vec::new(), pending: Vec::new(), free_cursor: AtomicIdCursor::new(0), + remote: Arc::new(RemoteEntitiesInner::new()), + entities_hot_for_remote: Self::DEFAULT_HOT_REMOTE_ENTITIES, + } + } + + /// Constructs a new [`RemoteEntities`] for this instance. + pub fn get_remote(&self) -> RemoteEntities { + RemoteEntities { + inner: self.remote.clone(), } } @@ -821,6 +925,8 @@ impl Entities { self.meta.clear(); self.pending.clear(); *self.free_cursor.get_mut() = 0; + self.remote.close(); + self.remote = Arc::new(RemoteEntitiesInner::new()); } /// Returns the location of an [`Entity`]. @@ -934,6 +1040,41 @@ impl Entities { &mut meta.location, ); } + + if self.remote.recent_requests.load(Ordering::Relaxed) > 0 { + // TODO: add core::intrinsics::unlikely once stable + self.force_remote_fulfill(init); + } + } + + // we do this to hint to the compiler that the if branch is unlinkely to be taken. + #[cold] + fn force_remote_fulfill(&mut self, init_allocated: impl FnMut(Entity, &mut EntityLocation)) { + let in_channel = self.entities_hot_for_remote; + let mut init_allocated = init_allocated; + let to_fulfill = self.remote.recent_requests.swap(0, Ordering::Relaxed); + let current_in_channel = self.remote.in_channel.load(Ordering::Relaxed); + let should_reserve = (to_fulfill + in_channel).saturating_sub(current_in_channel); // should_reserve = to_fulfill + (in_channel - current_in_channel) + let new_in_channel = current_in_channel + should_reserve - to_fulfill; // new_in_channel = current_in_channel + (should_reserve - to_fulfill). + self.remote + .in_channel + .store(new_in_channel, Ordering::Relaxed); + + self.reserve(should_reserve); + for _ in 0..should_reserve { + let entity = self.alloc(); + // SAFETY: we just allocated it + let loc = unsafe { + &mut self + .meta + .get_unchecked_mut(entity.index() as usize) + .location + }; + init_allocated(entity, loc); + let result = self.remote.reserver.try_send(entity); + // It should not be closed and it can't get full. + debug_assert!(result.is_ok()); + } } /// Flushes all reserved entities to an "invalid" state. Attempting to retrieve them will return `None` @@ -1035,6 +1176,13 @@ impl Entities { } } +impl Drop for Entities { + fn drop(&mut self) { + // Make sure remote entities are informed. + self.remote.close(); + } +} + /// An error that occurs when a specified [`Entity`] does not exist. #[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)] #[error("The entity with ID {entity} {details}")] @@ -1217,6 +1365,86 @@ mod tests { assert!(next_entity.generation() > entity.generation() + GENERATIONS); } + #[cfg(feature = "std")] + fn test_remote_reservation(entities: &mut Entities) { + use bevy_tasks::block_on; + use rand::{rngs::StdRng, Rng, SeedableRng}; + use std::thread; + + let mut rng = StdRng::seed_from_u64(89274528); + + let mut threads = (0..3) + .map(|_| { + let reserver = entities.get_remote(); + thread::spawn(move || { + let future = async { + for _ in 0..100 { + reserver.reserve_entity().await.unwrap(); + } + }; + block_on(future); + }) + }) + .collect::>(); + + let timeout = std::time::Instant::now(); + loop { + threads.retain(|thread| !thread.is_finished()); + entities.flush_as_invalid(); + if RAND { + entities.entities_hot_for_remote = rng.r#gen::() & 127; + } + if threads.is_empty() { + break; + } + if timeout.elapsed().as_secs() > 60 { + panic!("remote entities timed out.") + } + } + + assert_eq!( + entities.remote.in_channel.load(Ordering::Relaxed), + entities.remote.reserved.len() as u32 + ); + } + + #[test] + #[cfg(feature = "std")] + fn remote_reservation_empty_hot() { + let mut entities = Entities::new(); + entities.entities_hot_for_remote = 0; + test_remote_reservation::(&mut entities); + } + + #[test] + #[cfg(feature = "std")] + fn remote_reservation_standard_hot() { + let mut entities = Entities::new(); + // Lower batch size so more waiting is tested. + entities.entities_hot_for_remote = 16; + test_remote_reservation::(&mut entities); + } + + #[test] + #[cfg(feature = "std")] + fn remote_reservation_frequently_changed_hot() { + let mut entities = Entities::new(); + + entities.entities_hot_for_remote = 16; + test_remote_reservation::(&mut entities); + test_remote_reservation::(&mut entities); + + // Lower batch size so more waiting is tested. + entities.entities_hot_for_remote = 1024; + test_remote_reservation::(&mut entities); + test_remote_reservation::(&mut entities); + + entities.entities_hot_for_remote = 1024; + test_remote_reservation::(&mut entities); + entities.entities_hot_for_remote = 0; + test_remote_reservation::(&mut entities); + } + #[test] #[expect( clippy::nonminimal_bool,