Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bevy_reflect = ["dep:bevy_reflect"]
reflect_functions = ["bevy_reflect", "bevy_reflect/functions"]

## Use the configurable global error handler as the default error handler.
##
##
## This is typically used to turn panics from the ECS into loggable errors.
## This may be useful for production builds,
## but can result in a measurable performance impact, especially for commands.
Expand Down Expand Up @@ -85,6 +85,7 @@ std = [
"arrayvec?/std",
"log/std",
"bevy_platform_support/std",
"async-channel/std",
]

## `critical-section` provides the building blocks for synchronization primitives
Expand Down Expand Up @@ -132,12 +133,18 @@ variadics_please = { version = "1.1", default-features = false }
tracing = { version = "0.1", default-features = false, optional = true }
log = { version = "0.4", default-features = false }
bumpalo = "3"
async-channel = { version = "2.3", default-features = false }

[target.'cfg(not(all(target_has_atomic = "8", target_has_atomic = "16", target_has_atomic = "32", target_has_atomic = "64", target_has_atomic = "ptr")))'.dependencies]
concurrent-queue = { version = "2.5.0", default-features = false, features = [
"portable-atomic",
] }

# This is an unused dependency, but it adds the feature to a dependency of async-channel for no_std support
event-listener = { version = "5", default-features = false, features = [
"portable-atomic",
] }

[dev-dependencies]
rand = "0.8"
static_assertions = "1.1.0"
Expand Down
232 changes: 230 additions & 2 deletions crates/bevy_ecs/src/entity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ use crate::{
storage::{SparseSetIndex, TableId, TableRow},
};
use alloc::vec::Vec;
use bevy_platform_support::sync::atomic::Ordering;
use bevy_platform_support::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use core::{fmt, hash::Hash, mem, num::NonZero, panic::Location};
use log::warn;

Expand Down Expand Up @@ -515,6 +518,91 @@ impl<'a> core::iter::FusedIterator for ReserveEntitiesIterator<'a> {}
// SAFETY: Newly reserved entity values are unique.
unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {}

/// This is the shared data behind [`RemoteEntities`].
#[derive(Debug)]
struct RemoteEntitiesInner {
/// The number of requests that have been made since the last [`force_remote_fulfill`](Entities::force_remote_fulfill).
recent_requests: AtomicU32,
/// The number of entities we're trying to keep in the channel for low latency access.
in_channel: AtomicU32,

// Channels for sending and receiving reserved entities.
reserved: async_channel::Receiver<Entity>,
reserver: async_channel::Sender<Entity>,
}

/// An error that occurs when an [`Entity`] can not be reserved remotely.
/// See also [`RemoteEntities`].
#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
pub enum RemoteReservationError {
/// This happens when [`Entities`] are closed, dropped, etc while a [`RemoteEntities`] is trying to reserve from it.
#[error("A remote entity reserver tried to reserve an entity from a closed `Entities`.")]
Closed,
}

impl RemoteEntitiesInner {
fn new() -> Self {
let (sender, receiver) = async_channel::unbounded();
Self {
recent_requests: AtomicU32::new(0),
reserver: sender,
reserved: receiver,
in_channel: AtomicU32::new(0),
}
}

fn close(&self) {
self.reserved.close();
}
}

/// Manages access to [`Entities`] from any thread and async.
#[derive(Clone)]
pub struct RemoteEntities {
inner: Arc<RemoteEntitiesInner>,
}

impl RemoteEntities {
/// Reserves an [`Entity`] from async.
///
/// # Example
///
/// ```
/// use bevy_ecs::prelude::*;
///
/// let mut world = World::new();
/// let remote = world.entities().get_remote();
///
/// // The reserve is async so we need it to be on a separate thread.
/// let thread = std::thread::spawn(move || {
/// let future = async {
/// for _ in 0..100 {
/// remote.reserve_entity().await.unwrap();
/// }
/// };
/// bevy_tasks::block_on(future);
/// });
///
/// // We need to flush the entities as needed or the remote entities will get stuck.
/// while !thread.is_finished() {
/// world.flush();
/// }
/// ```
pub async fn reserve_entity(&self) -> Result<Entity, RemoteReservationError> {
self.inner.recent_requests.fetch_add(1, Ordering::Relaxed);
self.inner
.reserved
.recv()
.await
.map_err(|_| RemoteReservationError::Closed)
}

/// Returns true only if the [`Entities`] has discontinued this remote access.
pub fn is_closed(&self) -> bool {
self.inner.reserved.is_closed()
}
}

/// A [`World`]'s internal metadata store on all of its entities.
///
/// Contains metadata on:
Expand All @@ -525,6 +613,10 @@ unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {}
/// [`World`]: crate::world::World
#[derive(Debug)]
pub struct Entities {
/// This is the number of entities we keep ready for remote reservations via [`RemoteEntities::reserve_entity`].
/// A value too high can cause excess memory to be used, but a value too low can cause additional waiting.
pub entities_hot_for_remote: u32,
remote: Arc<RemoteEntitiesInner>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could we just make this store a RemoteEntities? That way we don't have to worry about constructing one in get_remote and can just blindly clone. Also means if we change the details of RemoteEntities we don't have to worry about how we construct it each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do this, and if v6 doesn't end up evolving much, I'm on board with this. However, many of the ways we can improve this is by caching some atomic results in a non-atomic that needs &mut. More specifically, we'd need RemoteEntities to have additional per-instance state in addition to the shared state in Arc<RemoteEntitiesInner>. But Entities isn't a remote instance, so it doesn't make sense to include that per-instance state in Entities.

I used this a lot in v4. If v6 is merged, I'll start slowly moving concepts from v4 into it to try to improve performance. If none of those changes are merged, then I can follow up by simplifying this per your suggestion. It's preemptive future proofing if that makes sense.

meta: Vec<EntityMeta>,

/// The `pending` and `free_cursor` fields describe three sets of Entity IDs
Expand Down Expand Up @@ -572,11 +664,23 @@ pub struct Entities {
}

impl Entities {
pub(crate) const fn new() -> Self {
/// The default value of [`entities_hot_for_remote`](Self::entities_hot_for_remote).
pub const DEFAULT_HOT_REMOTE_ENTITIES: u32 = 256;

pub(crate) fn new() -> Self {
Entities {
meta: Vec::new(),
pending: Vec::new(),
free_cursor: AtomicIdCursor::new(0),
remote: Arc::new(RemoteEntitiesInner::new()),
entities_hot_for_remote: Self::DEFAULT_HOT_REMOTE_ENTITIES,
}
}

/// Constructs a new [`RemoteEntities`] for this instance.
pub fn get_remote(&self) -> RemoteEntities {
RemoteEntities {
inner: self.remote.clone(),
}
}

Expand Down Expand Up @@ -821,6 +925,8 @@ impl Entities {
self.meta.clear();
self.pending.clear();
*self.free_cursor.get_mut() = 0;
self.remote.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will disconnect any outstanding RemoteEntities values so that they start failing to reserve. Is that really what we want to do in this case? It might be less disruptive to simply drain the queue.

Although it probably doesn't matter, since nobody will call clear() on a real application.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer this behavior. What if a clear happens while an asset is loading? Now we have to make sure every s included entity is valid instead of just making sure one is. Right now, the minute it clears, all current remote reservers fail, which I think a more transparent way of error handling.

(But like you said, it doesn't really mater).

self.remote = Arc::new(RemoteEntitiesInner::new());
}

/// Returns the location of an [`Entity`].
Expand Down Expand Up @@ -934,6 +1040,41 @@ impl Entities {
&mut meta.location,
);
}

if self.remote.recent_requests.load(Ordering::Relaxed) > 0 {
// TODO: add core::intrinsics::unlikely once stable
self.force_remote_fulfill(init);
}
}

// we do this to hint to the compiler that the if branch is unlinkely to be taken.
#[cold]
fn force_remote_fulfill(&mut self, init_allocated: impl FnMut(Entity, &mut EntityLocation)) {
let in_channel = self.entities_hot_for_remote;
let mut init_allocated = init_allocated;
let to_fulfill = self.remote.recent_requests.swap(0, Ordering::Relaxed);
let current_in_channel = self.remote.in_channel.load(Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option here is to store the (signed) difference between in_channel and recent_requests. Then you only have one value to update! You can use the existing IdCursor type for signed numbers. This would reduce to something like:

if self.remote.net_in_channel.load(Ordering::Relaxed) < self.entities_hot_for_remote {
// ...
let old_net_in_channel = self.remote.net_in_channel
    .fetch_max(self.entities_hot_for_remote, Ordering::Relaxed);
let to_fulfill = self.entities_hot_for_remote - old_net_in_channel;
if to_fulfill > 0 {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could, but I I'm not sure what the motivation would be. IIUC, it's the same number of atomic ops and the same amount of data stored. Actually its 8 bytes instead of 4, but still. I'm not opposed to it though, if there's a reason to do this that I'm just missing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was just trying to simplify the saturating_sub arithmetic by having one counter to modify instead of two.

let should_reserve = (to_fulfill + in_channel).saturating_sub(current_in_channel); // should_reserve = to_fulfill + (in_channel - current_in_channel)
let new_in_channel = current_in_channel + should_reserve - to_fulfill; // new_in_channel = current_in_channel + (should_reserve - to_fulfill).
self.remote
.in_channel
.store(new_in_channel, Ordering::Relaxed);

self.reserve(should_reserve);
for _ in 0..should_reserve {
let entity = self.alloc();
// SAFETY: we just allocated it
let loc = unsafe {
&mut self
.meta
.get_unchecked_mut(entity.index() as usize)
.location
};
init_allocated(entity, loc);
let result = self.remote.reserver.try_send(entity);
// It should not be closed and it can't get full.
debug_assert!(result.is_ok());
}
}

/// Flushes all reserved entities to an "invalid" state. Attempting to retrieve them will return `None`
Expand Down Expand Up @@ -1035,6 +1176,13 @@ impl Entities {
}
}

impl Drop for Entities {
fn drop(&mut self) {
// Make sure remote entities are informed.
self.remote.close();
}
}

/// An error that occurs when a specified [`Entity`] does not exist.
#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
#[error("The entity with ID {entity} {details}")]
Expand Down Expand Up @@ -1217,6 +1365,86 @@ mod tests {
assert!(next_entity.generation() > entity.generation() + GENERATIONS);
}

#[cfg(feature = "std")]
fn test_remote_reservation<const RAND: bool>(entities: &mut Entities) {
use bevy_tasks::block_on;
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::thread;

let mut rng = StdRng::seed_from_u64(89274528);

let mut threads = (0..3)
.map(|_| {
let reserver = entities.get_remote();
thread::spawn(move || {
let future = async {
for _ in 0..100 {
reserver.reserve_entity().await.unwrap();
}
};
block_on(future);
})
})
.collect::<Vec<_>>();

let timeout = std::time::Instant::now();
loop {
threads.retain(|thread| !thread.is_finished());
entities.flush_as_invalid();
if RAND {
entities.entities_hot_for_remote = rng.r#gen::<u32>() & 127;
}
if threads.is_empty() {
break;
}
if timeout.elapsed().as_secs() > 60 {
panic!("remote entities timed out.")
}
}

assert_eq!(
entities.remote.in_channel.load(Ordering::Relaxed),
entities.remote.reserved.len() as u32
);
}

#[test]
#[cfg(feature = "std")]
fn remote_reservation_empty_hot() {
let mut entities = Entities::new();
entities.entities_hot_for_remote = 0;
test_remote_reservation::<false>(&mut entities);
}

#[test]
#[cfg(feature = "std")]
fn remote_reservation_standard_hot() {
let mut entities = Entities::new();
// Lower batch size so more waiting is tested.
entities.entities_hot_for_remote = 16;
test_remote_reservation::<false>(&mut entities);
}

#[test]
#[cfg(feature = "std")]
fn remote_reservation_frequently_changed_hot() {
let mut entities = Entities::new();

entities.entities_hot_for_remote = 16;
test_remote_reservation::<false>(&mut entities);
test_remote_reservation::<true>(&mut entities);

// Lower batch size so more waiting is tested.
entities.entities_hot_for_remote = 1024;
test_remote_reservation::<false>(&mut entities);
test_remote_reservation::<true>(&mut entities);

entities.entities_hot_for_remote = 1024;
test_remote_reservation::<false>(&mut entities);
entities.entities_hot_for_remote = 0;
test_remote_reservation::<false>(&mut entities);
}

#[test]
#[expect(
clippy::nonminimal_bool,
Expand Down
Loading