-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Remote entity reservation v6 #18525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Remote entity reservation v6 #18525
Changes from all commits
5cadec0
e271786
a7124d0
cdd00ce
db53dda
8860106
b7fd02d
db84c5a
74de9c0
8f9f544
75844c0
51e115b
5ec8f4d
af38756
2082cbf
cf65840
eb4d48b
3f85f93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,7 +73,10 @@ use crate::{ | |
| storage::{SparseSetIndex, TableId, TableRow}, | ||
| }; | ||
| use alloc::vec::Vec; | ||
| use bevy_platform_support::sync::atomic::Ordering; | ||
| use bevy_platform_support::sync::{ | ||
| atomic::{AtomicU32, Ordering}, | ||
| Arc, | ||
| }; | ||
| use core::{fmt, hash::Hash, mem, num::NonZero, panic::Location}; | ||
| use log::warn; | ||
|
|
||
|
|
@@ -515,6 +518,91 @@ impl<'a> core::iter::FusedIterator for ReserveEntitiesIterator<'a> {} | |
| // SAFETY: Newly reserved entity values are unique. | ||
| unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {} | ||
|
|
||
| /// This is the shared data behind [`RemoteEntities`]. | ||
| #[derive(Debug)] | ||
| struct RemoteEntitiesInner { | ||
| /// The number of requests that have been made since the last [`force_remote_fulfill`](Entities::force_remote_fulfill). | ||
| recent_requests: AtomicU32, | ||
| /// The number of entities we're trying to keep in the channel for low latency access. | ||
| in_channel: AtomicU32, | ||
|
|
||
| // Channels for sending and receiving reserved entities. | ||
| reserved: async_channel::Receiver<Entity>, | ||
| reserver: async_channel::Sender<Entity>, | ||
| } | ||
|
|
||
| /// An error that occurs when an [`Entity`] can not be reserved remotely. | ||
| /// See also [`RemoteEntities`]. | ||
| #[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)] | ||
| pub enum RemoteReservationError { | ||
| /// This happens when [`Entities`] are closed, dropped, etc while a [`RemoteEntities`] is trying to reserve from it. | ||
| #[error("A remote entity reserver tried to reserve an entity from a closed `Entities`.")] | ||
| Closed, | ||
| } | ||
|
|
||
| impl RemoteEntitiesInner { | ||
| fn new() -> Self { | ||
| let (sender, receiver) = async_channel::unbounded(); | ||
| Self { | ||
| recent_requests: AtomicU32::new(0), | ||
| reserver: sender, | ||
| reserved: receiver, | ||
| in_channel: AtomicU32::new(0), | ||
| } | ||
| } | ||
|
|
||
| fn close(&self) { | ||
| self.reserved.close(); | ||
| } | ||
| } | ||
|
|
||
| /// Manages access to [`Entities`] from any thread and async. | ||
| #[derive(Clone)] | ||
| pub struct RemoteEntities { | ||
| inner: Arc<RemoteEntitiesInner>, | ||
| } | ||
|
|
||
| impl RemoteEntities { | ||
| /// Reserves an [`Entity`] from async. | ||
| /// | ||
| /// # Example | ||
| /// | ||
| /// ``` | ||
| /// use bevy_ecs::prelude::*; | ||
| /// | ||
| /// let mut world = World::new(); | ||
| /// let remote = world.entities().get_remote(); | ||
| /// | ||
| /// // The reserve is async so we need it to be on a separate thread. | ||
| /// let thread = std::thread::spawn(move || { | ||
| /// let future = async { | ||
| /// for _ in 0..100 { | ||
| /// remote.reserve_entity().await.unwrap(); | ||
| /// } | ||
| /// }; | ||
| /// bevy_tasks::block_on(future); | ||
| /// }); | ||
| /// | ||
| /// // We need to flush the entities as needed or the remote entities will get stuck. | ||
| /// while !thread.is_finished() { | ||
| /// world.flush(); | ||
| /// } | ||
| /// ``` | ||
| pub async fn reserve_entity(&self) -> Result<Entity, RemoteReservationError> { | ||
| self.inner.recent_requests.fetch_add(1, Ordering::Relaxed); | ||
| self.inner | ||
| .reserved | ||
| .recv() | ||
| .await | ||
| .map_err(|_| RemoteReservationError::Closed) | ||
| } | ||
|
|
||
| /// Returns true only if the [`Entities`] has discontinued this remote access. | ||
| pub fn is_closed(&self) -> bool { | ||
| self.inner.reserved.is_closed() | ||
| } | ||
| } | ||
|
|
||
| /// A [`World`]'s internal metadata store on all of its entities. | ||
| /// | ||
| /// Contains metadata on: | ||
|
|
@@ -525,6 +613,10 @@ unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {} | |
| /// [`World`]: crate::world::World | ||
| #[derive(Debug)] | ||
| pub struct Entities { | ||
| /// This is the number of entities we keep ready for remote reservations via [`RemoteEntities::reserve_entity`]. | ||
| /// A value too high can cause excess memory to be used, but a value too low can cause additional waiting. | ||
| pub entities_hot_for_remote: u32, | ||
| remote: Arc<RemoteEntitiesInner>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Could we just make this store a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could do this, and if v6 doesn't end up evolving much, I'm on board with this. However, many of the ways we can improve this is by caching some atomic results in a non-atomic that needs I used this a lot in v4. If v6 is merged, I'll start slowly moving concepts from v4 into it to try to improve performance. If none of those changes are merged, then I can follow up by simplifying this per your suggestion. It's preemptive future proofing if that makes sense. |
||
| meta: Vec<EntityMeta>, | ||
|
|
||
| /// The `pending` and `free_cursor` fields describe three sets of Entity IDs | ||
|
|
@@ -572,11 +664,23 @@ pub struct Entities { | |
| } | ||
|
|
||
| impl Entities { | ||
| pub(crate) const fn new() -> Self { | ||
| /// The default value of [`entities_hot_for_remote`](Self::entities_hot_for_remote). | ||
| pub const DEFAULT_HOT_REMOTE_ENTITIES: u32 = 256; | ||
|
|
||
| pub(crate) fn new() -> Self { | ||
| Entities { | ||
| meta: Vec::new(), | ||
| pending: Vec::new(), | ||
| free_cursor: AtomicIdCursor::new(0), | ||
| remote: Arc::new(RemoteEntitiesInner::new()), | ||
| entities_hot_for_remote: Self::DEFAULT_HOT_REMOTE_ENTITIES, | ||
| } | ||
| } | ||
|
|
||
| /// Constructs a new [`RemoteEntities`] for this instance. | ||
| pub fn get_remote(&self) -> RemoteEntities { | ||
| RemoteEntities { | ||
| inner: self.remote.clone(), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -821,6 +925,8 @@ impl Entities { | |
| self.meta.clear(); | ||
| self.pending.clear(); | ||
| *self.free_cursor.get_mut() = 0; | ||
| self.remote.close(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will disconnect any outstanding Although it probably doesn't matter, since nobody will call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer this behavior. What if a clear happens while an asset is loading? Now we have to make sure every s included entity is valid instead of just making sure one is. Right now, the minute it clears, all current remote reservers fail, which I think a more transparent way of error handling. (But like you said, it doesn't really mater). |
||
| self.remote = Arc::new(RemoteEntitiesInner::new()); | ||
| } | ||
|
|
||
| /// Returns the location of an [`Entity`]. | ||
|
|
@@ -934,6 +1040,41 @@ impl Entities { | |
| &mut meta.location, | ||
| ); | ||
| } | ||
|
|
||
| if self.remote.recent_requests.load(Ordering::Relaxed) > 0 { | ||
| // TODO: add core::intrinsics::unlikely once stable | ||
| self.force_remote_fulfill(init); | ||
| } | ||
| } | ||
|
|
||
| // we do this to hint to the compiler that the if branch is unlinkely to be taken. | ||
| #[cold] | ||
| fn force_remote_fulfill(&mut self, init_allocated: impl FnMut(Entity, &mut EntityLocation)) { | ||
| let in_channel = self.entities_hot_for_remote; | ||
| let mut init_allocated = init_allocated; | ||
| let to_fulfill = self.remote.recent_requests.swap(0, Ordering::Relaxed); | ||
| let current_in_channel = self.remote.in_channel.load(Ordering::Relaxed); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another option here is to store the (signed) difference between if self.remote.net_in_channel.load(Ordering::Relaxed) < self.entities_hot_for_remote {
// ...
let old_net_in_channel = self.remote.net_in_channel
.fetch_max(self.entities_hot_for_remote, Ordering::Relaxed);
let to_fulfill = self.entities_hot_for_remote - old_net_in_channel;
if to_fulfill > 0 {
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose we could, but I I'm not sure what the motivation would be. IIUC, it's the same number of atomic ops and the same amount of data stored. Actually its 8 bytes instead of 4, but still. I'm not opposed to it though, if there's a reason to do this that I'm just missing.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I was just trying to simplify the |
||
| let should_reserve = (to_fulfill + in_channel).saturating_sub(current_in_channel); // should_reserve = to_fulfill + (in_channel - current_in_channel) | ||
| let new_in_channel = current_in_channel + should_reserve - to_fulfill; // new_in_channel = current_in_channel + (should_reserve - to_fulfill). | ||
| self.remote | ||
| .in_channel | ||
| .store(new_in_channel, Ordering::Relaxed); | ||
|
|
||
| self.reserve(should_reserve); | ||
| for _ in 0..should_reserve { | ||
ElliottjPierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let entity = self.alloc(); | ||
| // SAFETY: we just allocated it | ||
| let loc = unsafe { | ||
| &mut self | ||
| .meta | ||
| .get_unchecked_mut(entity.index() as usize) | ||
| .location | ||
| }; | ||
| init_allocated(entity, loc); | ||
| let result = self.remote.reserver.try_send(entity); | ||
| // It should not be closed and it can't get full. | ||
| debug_assert!(result.is_ok()); | ||
| } | ||
| } | ||
|
|
||
| /// Flushes all reserved entities to an "invalid" state. Attempting to retrieve them will return `None` | ||
|
|
@@ -1035,6 +1176,13 @@ impl Entities { | |
| } | ||
| } | ||
|
|
||
| impl Drop for Entities { | ||
| fn drop(&mut self) { | ||
| // Make sure remote entities are informed. | ||
| self.remote.close(); | ||
| } | ||
| } | ||
|
|
||
| /// An error that occurs when a specified [`Entity`] does not exist. | ||
| #[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)] | ||
| #[error("The entity with ID {entity} {details}")] | ||
|
|
@@ -1217,6 +1365,86 @@ mod tests { | |
| assert!(next_entity.generation() > entity.generation() + GENERATIONS); | ||
| } | ||
|
|
||
| #[cfg(feature = "std")] | ||
| fn test_remote_reservation<const RAND: bool>(entities: &mut Entities) { | ||
| use bevy_tasks::block_on; | ||
| use rand::{rngs::StdRng, Rng, SeedableRng}; | ||
| use std::thread; | ||
|
|
||
| let mut rng = StdRng::seed_from_u64(89274528); | ||
|
|
||
| let mut threads = (0..3) | ||
| .map(|_| { | ||
| let reserver = entities.get_remote(); | ||
| thread::spawn(move || { | ||
| let future = async { | ||
| for _ in 0..100 { | ||
| reserver.reserve_entity().await.unwrap(); | ||
| } | ||
| }; | ||
| block_on(future); | ||
| }) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| let timeout = std::time::Instant::now(); | ||
| loop { | ||
| threads.retain(|thread| !thread.is_finished()); | ||
| entities.flush_as_invalid(); | ||
| if RAND { | ||
| entities.entities_hot_for_remote = rng.r#gen::<u32>() & 127; | ||
| } | ||
| if threads.is_empty() { | ||
| break; | ||
| } | ||
| if timeout.elapsed().as_secs() > 60 { | ||
| panic!("remote entities timed out.") | ||
| } | ||
| } | ||
|
|
||
| assert_eq!( | ||
| entities.remote.in_channel.load(Ordering::Relaxed), | ||
| entities.remote.reserved.len() as u32 | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| #[cfg(feature = "std")] | ||
| fn remote_reservation_empty_hot() { | ||
| let mut entities = Entities::new(); | ||
| entities.entities_hot_for_remote = 0; | ||
| test_remote_reservation::<false>(&mut entities); | ||
| } | ||
|
|
||
| #[test] | ||
| #[cfg(feature = "std")] | ||
| fn remote_reservation_standard_hot() { | ||
| let mut entities = Entities::new(); | ||
ElliottjPierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // Lower batch size so more waiting is tested. | ||
| entities.entities_hot_for_remote = 16; | ||
| test_remote_reservation::<false>(&mut entities); | ||
| } | ||
|
|
||
| #[test] | ||
| #[cfg(feature = "std")] | ||
| fn remote_reservation_frequently_changed_hot() { | ||
| let mut entities = Entities::new(); | ||
|
|
||
| entities.entities_hot_for_remote = 16; | ||
| test_remote_reservation::<false>(&mut entities); | ||
| test_remote_reservation::<true>(&mut entities); | ||
|
|
||
| // Lower batch size so more waiting is tested. | ||
| entities.entities_hot_for_remote = 1024; | ||
| test_remote_reservation::<false>(&mut entities); | ||
| test_remote_reservation::<true>(&mut entities); | ||
|
|
||
| entities.entities_hot_for_remote = 1024; | ||
| test_remote_reservation::<false>(&mut entities); | ||
| entities.entities_hot_for_remote = 0; | ||
| test_remote_reservation::<false>(&mut entities); | ||
| } | ||
|
|
||
| #[test] | ||
| #[expect( | ||
| clippy::nonminimal_bool, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.