From 6eee56c264e7ca2a9d26ff0805afbe4bb6629d1d Mon Sep 17 00:00:00 2001 From: Imbris Date: Mon, 1 Jan 2024 17:10:50 -0500 Subject: [PATCH] Add a dispatcher that can be sent between threads --- src/dispatch/dispatcher.rs | 32 +++++++- src/dispatch/mod.rs | 2 + src/dispatch/send_dispatcher.rs | 130 ++++++++++++++++++++++++++++++++ src/lib.rs | 2 +- src/world/res_downcast/mod.rs | 2 +- 5 files changed, 165 insertions(+), 3 deletions(-) create mode 100644 src/dispatch/send_dispatcher.rs diff --git a/src/dispatch/dispatcher.rs b/src/dispatch/dispatcher.rs index d4b4fd16..ffb3f5c8 100644 --- a/src/dispatch/dispatcher.rs +++ b/src/dispatch/dispatcher.rs @@ -1,6 +1,10 @@ use smallvec::SmallVec; -use crate::{dispatch::stage::Stage, system::RunNow, world::World}; +use crate::{ + dispatch::{stage::Stage, SendDispatcher}, + system::RunNow, + world::World, +}; /// This wrapper is used to share a replaceable ThreadPool with other /// dispatchers. Useful with batch dispatchers. @@ -114,6 +118,32 @@ impl<'a, 'b> Dispatcher<'a, 'b> { } } + /// Converts this to a [`SendDispatcher`]. + /// + /// Fails and returns the original distpatcher if it contains thread local systems. + pub fn try_into_sendable(self) -> Result, Self> { + let Dispatcher { + stages, + thread_local, + thread_pool, + } = self; + + if thread_local.is_empty() { + Ok(SendDispatcher { + stages, + #[cfg(feature = "parallel")] + thread_pool, + }) + } else { + Err(Dispatcher { + stages, + thread_local, + #[cfg(feature = "parallel")] + thread_pool, + }) + } + } + /// This method returns the largest amount of threads this dispatcher /// can make use of. This is mainly for debugging purposes so you can see /// how well your systems can make use of multi-threading. diff --git a/src/dispatch/mod.rs b/src/dispatch/mod.rs index 1734d191..aaed663a 100644 --- a/src/dispatch/mod.rs +++ b/src/dispatch/mod.rs @@ -9,6 +9,7 @@ pub use self::{ }, builder::DispatcherBuilder, dispatcher::Dispatcher, + send_dispatcher::SendDispatcher, }; #[cfg(feature = "parallel")] @@ -18,5 +19,6 @@ mod builder; mod dispatcher; #[cfg(feature = "parallel")] mod par_seq; +mod send_dispatcher; mod stage; mod util; diff --git a/src/dispatch/send_dispatcher.rs b/src/dispatch/send_dispatcher.rs new file mode 100644 index 00000000..7dc6ce9b --- /dev/null +++ b/src/dispatch/send_dispatcher.rs @@ -0,0 +1,130 @@ +use crate::{ + dispatch::{dispatcher::ThreadPoolWrapper, stage::Stage}, + system::RunNow, + world::World, +}; + +/// `Send`able version of [`Dispatcher`](crate::dispatch::Dispatcher). +/// +/// Can't hold thread local systems. +/// +/// Create using [`Dispatcher::try_into_sendable`](crate::dispatch::Dispatcher::try_into_sendable). +pub struct SendDispatcher<'a> { + pub(super) stages: Vec>, + #[cfg(feature = "parallel")] + pub(super) thread_pool: ::std::sync::Arc<::std::sync::RwLock>, +} + +impl<'a> SendDispatcher<'a> { + /// Sets up all the systems which means they are gonna add default values + /// for the resources they need. + pub fn setup(&mut self, world: &mut World) { + for stage in &mut self.stages { + stage.setup(world); + } + } + + /// Calls the `dispose` method of all systems and allows them to release + /// external resources. It is common this method removes components and + /// / or resources from the `World` which are associated with external + /// resources. + pub fn dispose(self, world: &mut World) { + for stage in self.stages { + stage.dispose(world); + } + } + + /// Dispatch all the systems with given resources and context + /// and then run thread local systems. + /// + /// This function automatically redirects to + /// + /// * [SendDispatcher::dispatch_par] in case it is supported + /// * [SendDispatcher::dispatch_seq] otherwise + /// + /// and runs `dispatch_thread_local` afterwards. + /// + /// Please note that this method assumes that no resource + /// is currently borrowed. If that's the case, it panics. + pub fn dispatch(&mut self, world: &World) { + #[cfg(feature = "parallel")] + self.dispatch_par(world); + + #[cfg(not(feature = "parallel"))] + self.dispatch_seq(world); + } + + /// Dispatches the systems (except thread local systems) + /// in parallel given the resources to operate on. + /// + /// This operation blocks the + /// executing thread. + /// + /// Only available with "parallel" feature enabled. + /// + /// Please note that this method assumes that no resource + /// is currently borrowed. If that's the case, it panics. + #[cfg(feature = "parallel")] + pub fn dispatch_par(&mut self, world: &World) { + let stages = &mut self.stages; + + self.thread_pool + .read() + .unwrap() + .as_ref() + .unwrap() + .install(move || { + for stage in stages { + stage.execute(world); + } + }); + } + + /// Dispatches the systems (except thread local systems) sequentially. + /// + /// This is useful if parallel overhead is + /// too big or the platform does not support multithreading. + /// + /// Please note that this method assumes that no resource + /// is currently borrowed. If that's the case, it panics. + pub fn dispatch_seq(&mut self, world: &World) { + for stage in &mut self.stages { + stage.execute_seq(world); + } + } + + /// This method returns the largest amount of threads this dispatcher + /// can make use of. This is mainly for debugging purposes so you can see + /// how well your systems can make use of multi-threading. + #[cfg(feature = "parallel")] + pub fn max_threads(&self) -> usize { + self.stages + .iter() + .map(Stage::max_threads) + .max() + .unwrap_or(0) + } +} + +impl<'a, 'b> RunNow<'a> for SendDispatcher<'b> { + fn run_now(&mut self, world: &World) { + self.dispatch(world); + } + + fn setup(&mut self, world: &mut World) { + self.setup(world); + } + + fn dispose(self: Box, world: &mut World) { + (*self).dispose(world); + } +} + +#[cfg(test)] +mod tests { + #[test] + fn send_dispatcher_is_send() { + fn is_send() {} + is_send::(); + } +} diff --git a/src/lib.rs b/src/lib.rs index 7857e6dc..1c0c96b2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -110,7 +110,7 @@ pub use crate::dispatch::{Par, ParSeq, RunWithPool, Seq}; pub use crate::{ dispatch::{ BatchAccessor, BatchController, BatchUncheckedWorld, Dispatcher, DispatcherBuilder, - MultiDispatchController, MultiDispatcher, + MultiDispatchController, MultiDispatcher, SendDispatcher, }, meta::{CastFrom, MetaIter, MetaIterMut, MetaTable}, system::{ diff --git a/src/world/res_downcast/mod.rs b/src/world/res_downcast/mod.rs index c0dbb7e0..065034ca 100644 --- a/src/world/res_downcast/mod.rs +++ b/src/world/res_downcast/mod.rs @@ -1,4 +1,4 @@ -//! Code is based on https://github.com/chris-morgan/mopa +//! Code is based on //! with the macro inlined for `Resource`. License files can be found in the //! directory of this source file, see COPYRIGHT, LICENSE-APACHE and //! LICENSE-MIT.