diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 220f59f..eefc25b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,4 +73,6 @@ jobs: # Miri currently reports leaks in some tests so we disable that check # here (might be due to ptr-int-ptr in crossbeam-epoch so might be # resolved in future versions of that crate). - run: MIRIFLAGS="-Zmiri-ignore-leaks" cargo miri test + # + # crossbeam-epoch doesn't pass with stacked borrows https://github.com/crossbeam-rs/crossbeam/issues/545 + run: MIRIFLAGS="-Zmiri-ignore-leaks -Zmiri-tree-borrows" cargo miri test diff --git a/src/dispatch/dispatcher.rs b/src/dispatch/dispatcher.rs index d4b4fd1..e6f064c 100644 --- a/src/dispatch/dispatcher.rs +++ b/src/dispatch/dispatcher.rs @@ -1,6 +1,10 @@ use smallvec::SmallVec; -use crate::{dispatch::stage::Stage, system::RunNow, world::World}; +use crate::{ + dispatch::{stage::Stage, SendDispatcher}, + system::RunNow, + world::World, +}; /// This wrapper is used to share a replaceable ThreadPool with other /// dispatchers. Useful with batch dispatchers. @@ -10,19 +14,15 @@ pub type ThreadPoolWrapper = Option<::std::sync::Arc<::rayon::ThreadPool>>; /// The dispatcher struct, allowing /// systems to be executed in parallel. pub struct Dispatcher<'a, 'b> { - stages: Vec>, + inner: SendDispatcher<'a>, thread_local: ThreadLocal<'b>, - #[cfg(feature = "parallel")] - thread_pool: ::std::sync::Arc<::std::sync::RwLock>, } impl<'a, 'b> Dispatcher<'a, 'b> { /// Sets up all the systems which means they are gonna add default values /// for the resources they need. pub fn setup(&mut self, world: &mut World) { - for stage in &mut self.stages { - stage.setup(world); - } + self.inner.setup(world); for sys in &mut self.thread_local { sys.setup(world); @@ -34,9 +34,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> { /// / or resources from the `World` which are associated with external /// resources. pub fn dispose(self, world: &mut World) { - for stage in self.stages { - stage.dispose(world); - } + self.inner.dispose(world); for sys in self.thread_local { sys.dispose(world); @@ -56,12 +54,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> { /// Please note that this method assumes that no resource /// is currently borrowed. If that's the case, it panics. pub fn dispatch(&mut self, world: &World) { - #[cfg(feature = "parallel")] - self.dispatch_par(world); - - #[cfg(not(feature = "parallel"))] - self.dispatch_seq(world); - + self.inner.dispatch(world); self.dispatch_thread_local(world); } @@ -77,18 +70,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> { /// is currently borrowed. If that's the case, it panics. #[cfg(feature = "parallel")] pub fn dispatch_par(&mut self, world: &World) { - let stages = &mut self.stages; - - self.thread_pool - .read() - .unwrap() - .as_ref() - .unwrap() - .install(move || { - for stage in stages { - stage.execute(world); - } - }); + self.inner.dispatch_par(world); } /// Dispatches the systems (except thread local systems) sequentially. @@ -99,9 +81,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> { /// Please note that this method assumes that no resource /// is currently borrowed. If that's the case, it panics. pub fn dispatch_seq(&mut self, world: &World) { - for stage in &mut self.stages { - stage.execute_seq(world); - } + self.inner.dispatch_seq(world); } /// Dispatch only thread local systems sequentially. @@ -114,16 +94,28 @@ impl<'a, 'b> Dispatcher<'a, 'b> { } } + /// Converts this to a [`SendDispatcher`]. + /// + /// Fails and returns the original distpatcher if it contains thread local systems. + pub fn try_into_sendable(self) -> Result, Self> { + let Dispatcher { + inner: _, + thread_local, + } = &self; + + if thread_local.is_empty() { + Ok(self.inner) + } else { + Err(self) + } + } + /// This method returns the largest amount of threads this dispatcher /// can make use of. This is mainly for debugging purposes so you can see /// how well your systems can make use of multi-threading. #[cfg(feature = "parallel")] pub fn max_threads(&self) -> usize { - self.stages - .iter() - .map(Stage::max_threads) - .max() - .unwrap_or(0) + self.inner.max_threads() } } @@ -154,9 +146,11 @@ pub fn new_dispatcher<'a, 'b>( thread_pool: ::std::sync::Arc<::std::sync::RwLock>, ) -> Dispatcher<'a, 'b> { Dispatcher { - stages, + inner: SendDispatcher { + stages, + thread_pool, + }, thread_local, - thread_pool, } } @@ -166,7 +160,7 @@ pub fn new_dispatcher<'a, 'b>( thread_local: ThreadLocal<'b>, ) -> Dispatcher<'a, 'b> { Dispatcher { - stages, + inner: SendDispatcher { stages }, thread_local, } } diff --git a/src/dispatch/mod.rs b/src/dispatch/mod.rs index 1734d19..aaed663 100644 --- a/src/dispatch/mod.rs +++ b/src/dispatch/mod.rs @@ -9,6 +9,7 @@ pub use self::{ }, builder::DispatcherBuilder, dispatcher::Dispatcher, + send_dispatcher::SendDispatcher, }; #[cfg(feature = "parallel")] @@ -18,5 +19,6 @@ mod builder; mod dispatcher; #[cfg(feature = "parallel")] mod par_seq; +mod send_dispatcher; mod stage; mod util; diff --git a/src/dispatch/send_dispatcher.rs b/src/dispatch/send_dispatcher.rs new file mode 100644 index 0000000..6edcb9b --- /dev/null +++ b/src/dispatch/send_dispatcher.rs @@ -0,0 +1,128 @@ +#[cfg(feature = "parallel")] +use crate::dispatch::dispatcher::ThreadPoolWrapper; +use crate::{dispatch::stage::Stage, system::RunNow, world::World}; + +/// `Send`able version of [`Dispatcher`](crate::dispatch::Dispatcher). +/// +/// Can't hold thread local systems. +/// +/// Create using [`Dispatcher::try_into_sendable`](crate::dispatch::Dispatcher::try_into_sendable). +pub struct SendDispatcher<'a> { + pub(super) stages: Vec>, + #[cfg(feature = "parallel")] + pub(super) thread_pool: ::std::sync::Arc<::std::sync::RwLock>, +} + +impl<'a> SendDispatcher<'a> { + /// Sets up all the systems which means they are gonna add default values + /// for the resources they need. + pub fn setup(&mut self, world: &mut World) { + for stage in &mut self.stages { + stage.setup(world); + } + } + + /// Calls the `dispose` method of all systems and allows them to release + /// external resources. It is common this method removes components and + /// / or resources from the `World` which are associated with external + /// resources. + pub fn dispose(self, world: &mut World) { + for stage in self.stages { + stage.dispose(world); + } + } + + /// Dispatch all the systems with given resources and context + /// and then run thread local systems. + /// + /// This function automatically redirects to + /// + /// * [SendDispatcher::dispatch_par] in case it is supported + /// * [SendDispatcher::dispatch_seq] otherwise + /// + /// and runs `dispatch_thread_local` afterwards. + /// + /// Please note that this method assumes that no resource + /// is currently borrowed. If that's the case, it panics. + pub fn dispatch(&mut self, world: &World) { + #[cfg(feature = "parallel")] + self.dispatch_par(world); + + #[cfg(not(feature = "parallel"))] + self.dispatch_seq(world); + } + + /// Dispatches the systems (except thread local systems) + /// in parallel given the resources to operate on. + /// + /// This operation blocks the + /// executing thread. + /// + /// Only available with "parallel" feature enabled. + /// + /// Please note that this method assumes that no resource + /// is currently borrowed. If that's the case, it panics. + #[cfg(feature = "parallel")] + pub fn dispatch_par(&mut self, world: &World) { + let stages = &mut self.stages; + + self.thread_pool + .read() + .unwrap() + .as_ref() + .unwrap() + .install(move || { + for stage in stages { + stage.execute(world); + } + }); + } + + /// Dispatches the systems (except thread local systems) sequentially. + /// + /// This is useful if parallel overhead is + /// too big or the platform does not support multithreading. + /// + /// Please note that this method assumes that no resource + /// is currently borrowed. If that's the case, it panics. + pub fn dispatch_seq(&mut self, world: &World) { + for stage in &mut self.stages { + stage.execute_seq(world); + } + } + + /// This method returns the largest amount of threads this dispatcher + /// can make use of. This is mainly for debugging purposes so you can see + /// how well your systems can make use of multi-threading. + #[cfg(feature = "parallel")] + pub fn max_threads(&self) -> usize { + self.stages + .iter() + .map(Stage::max_threads) + .max() + .unwrap_or(0) + } +} + +impl<'a, 'b> RunNow<'a> for SendDispatcher<'b> { + fn run_now(&mut self, world: &World) { + self.dispatch(world); + } + + fn setup(&mut self, world: &mut World) { + self.setup(world); + } + + fn dispose(self: Box, world: &mut World) { + (*self).dispose(world); + } +} + +#[cfg(test)] +mod tests { + #[test] + fn send_dispatcher_is_send() { + fn is_send() {} + is_send::(); + } +} diff --git a/src/lib.rs b/src/lib.rs index 7857e6d..1c0c96b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -110,7 +110,7 @@ pub use crate::dispatch::{Par, ParSeq, RunWithPool, Seq}; pub use crate::{ dispatch::{ BatchAccessor, BatchController, BatchUncheckedWorld, Dispatcher, DispatcherBuilder, - MultiDispatchController, MultiDispatcher, + MultiDispatchController, MultiDispatcher, SendDispatcher, }, meta::{CastFrom, MetaIter, MetaIterMut, MetaTable}, system::{ diff --git a/src/world/res_downcast/mod.rs b/src/world/res_downcast/mod.rs index c0dbb7e..065034c 100644 --- a/src/world/res_downcast/mod.rs +++ b/src/world/res_downcast/mod.rs @@ -1,4 +1,4 @@ -//! Code is based on https://github.com/chris-morgan/mopa +//! Code is based on //! with the macro inlined for `Resource`. License files can be found in the //! directory of this source file, see COPYRIGHT, LICENSE-APACHE and //! LICENSE-MIT.