Skip to content

Commit

Permalink
Add a dispatcher that can be sent between threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Imberflur committed Jan 2, 2024
1 parent 5450bf3 commit 6eee56c
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 3 deletions.
32 changes: 31 additions & 1 deletion src/dispatch/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use smallvec::SmallVec;

use crate::{dispatch::stage::Stage, system::RunNow, world::World};
use crate::{
dispatch::{stage::Stage, SendDispatcher},
system::RunNow,
world::World,
};

/// This wrapper is used to share a replaceable ThreadPool with other
/// dispatchers. Useful with batch dispatchers.
Expand Down Expand Up @@ -114,6 +118,32 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
}
}

/// Converts this to a [`SendDispatcher`].
///
/// Fails and returns the original distpatcher if it contains thread local systems.
pub fn try_into_sendable(self) -> Result<SendDispatcher<'a>, Self> {
let Dispatcher {
stages,
thread_local,
thread_pool,

Check failure on line 128 in src/dispatch/dispatcher.rs

View workflow job for this annotation

GitHub Actions / Build and Test (Linux) (nightly)

struct `Dispatcher` does not have a field named `thread_pool`

Check failure on line 128 in src/dispatch/dispatcher.rs

View workflow job for this annotation

GitHub Actions / Build and Test (Linux) (beta)

struct `Dispatcher` does not have a field named `thread_pool`

Check failure on line 128 in src/dispatch/dispatcher.rs

View workflow job for this annotation

GitHub Actions / Build and Test (Linux) (stable)

struct `Dispatcher` does not have a field named `thread_pool`
} = self;

if thread_local.is_empty() {
Ok(SendDispatcher {
stages,
#[cfg(feature = "parallel")]
thread_pool,
})
} else {
Err(Dispatcher {
stages,
thread_local,
#[cfg(feature = "parallel")]
thread_pool,
})
}
}

/// This method returns the largest amount of threads this dispatcher
/// can make use of. This is mainly for debugging purposes so you can see
/// how well your systems can make use of multi-threading.
Expand Down
2 changes: 2 additions & 0 deletions src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use self::{
},
builder::DispatcherBuilder,
dispatcher::Dispatcher,
send_dispatcher::SendDispatcher,
};

#[cfg(feature = "parallel")]
Expand All @@ -18,5 +19,6 @@ mod builder;
mod dispatcher;
#[cfg(feature = "parallel")]
mod par_seq;
mod send_dispatcher;
mod stage;
mod util;
130 changes: 130 additions & 0 deletions src/dispatch/send_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use crate::{
dispatch::{dispatcher::ThreadPoolWrapper, stage::Stage},

Check failure on line 2 in src/dispatch/send_dispatcher.rs

View workflow job for this annotation

GitHub Actions / Build and Test (Linux) (nightly)

unresolved import `crate::dispatch::dispatcher::ThreadPoolWrapper`

Check failure on line 2 in src/dispatch/send_dispatcher.rs

View workflow job for this annotation

GitHub Actions / Build and Test (Linux) (beta)

unresolved import `crate::dispatch::dispatcher::ThreadPoolWrapper`

Check failure on line 2 in src/dispatch/send_dispatcher.rs

View workflow job for this annotation

GitHub Actions / Build and Test (Linux) (stable)

unresolved import `crate::dispatch::dispatcher::ThreadPoolWrapper`
system::RunNow,
world::World,
};

/// `Send`able version of [`Dispatcher`](crate::dispatch::Dispatcher).
///
/// Can't hold thread local systems.
///
/// Create using [`Dispatcher::try_into_sendable`](crate::dispatch::Dispatcher::try_into_sendable).
pub struct SendDispatcher<'a> {
pub(super) stages: Vec<Stage<'a>>,
#[cfg(feature = "parallel")]
pub(super) thread_pool: ::std::sync::Arc<::std::sync::RwLock<ThreadPoolWrapper>>,
}

impl<'a> SendDispatcher<'a> {
/// Sets up all the systems which means they are gonna add default values
/// for the resources they need.
pub fn setup(&mut self, world: &mut World) {
for stage in &mut self.stages {
stage.setup(world);
}
}

/// Calls the `dispose` method of all systems and allows them to release
/// external resources. It is common this method removes components and
/// / or resources from the `World` which are associated with external
/// resources.
pub fn dispose(self, world: &mut World) {
for stage in self.stages {
stage.dispose(world);
}
}

/// Dispatch all the systems with given resources and context
/// and then run thread local systems.
///
/// This function automatically redirects to
///
/// * [SendDispatcher::dispatch_par] in case it is supported
/// * [SendDispatcher::dispatch_seq] otherwise
///
/// and runs `dispatch_thread_local` afterwards.
///
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
pub fn dispatch(&mut self, world: &World) {
#[cfg(feature = "parallel")]
self.dispatch_par(world);

#[cfg(not(feature = "parallel"))]
self.dispatch_seq(world);
}

/// Dispatches the systems (except thread local systems)
/// in parallel given the resources to operate on.
///
/// This operation blocks the
/// executing thread.
///
/// Only available with "parallel" feature enabled.
///
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
#[cfg(feature = "parallel")]
pub fn dispatch_par(&mut self, world: &World) {
let stages = &mut self.stages;

self.thread_pool
.read()
.unwrap()
.as_ref()
.unwrap()
.install(move || {
for stage in stages {
stage.execute(world);
}
});
}

/// Dispatches the systems (except thread local systems) sequentially.
///
/// This is useful if parallel overhead is
/// too big or the platform does not support multithreading.
///
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
pub fn dispatch_seq(&mut self, world: &World) {
for stage in &mut self.stages {
stage.execute_seq(world);
}
}

/// This method returns the largest amount of threads this dispatcher
/// can make use of. This is mainly for debugging purposes so you can see
/// how well your systems can make use of multi-threading.
#[cfg(feature = "parallel")]
pub fn max_threads(&self) -> usize {
self.stages
.iter()
.map(Stage::max_threads)
.max()
.unwrap_or(0)
}
}

impl<'a, 'b> RunNow<'a> for SendDispatcher<'b> {
fn run_now(&mut self, world: &World) {
self.dispatch(world);
}

fn setup(&mut self, world: &mut World) {
self.setup(world);
}

fn dispose(self: Box<Self>, world: &mut World) {
(*self).dispose(world);
}
}

#[cfg(test)]
mod tests {
#[test]
fn send_dispatcher_is_send() {
fn is_send<T: Send>() {}
is_send::<super::SendDispatcher>();
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub use crate::dispatch::{Par, ParSeq, RunWithPool, Seq};
pub use crate::{
dispatch::{
BatchAccessor, BatchController, BatchUncheckedWorld, Dispatcher, DispatcherBuilder,
MultiDispatchController, MultiDispatcher,
MultiDispatchController, MultiDispatcher, SendDispatcher,
},
meta::{CastFrom, MetaIter, MetaIterMut, MetaTable},
system::{
Expand Down
2 changes: 1 addition & 1 deletion src/world/res_downcast/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Code is based on https://github.com/chris-morgan/mopa
//! Code is based on <https://github.com/chris-morgan/mopa>
//! with the macro inlined for `Resource`. License files can be found in the
//! directory of this source file, see COPYRIGHT, LICENSE-APACHE and
//! LICENSE-MIT.
Expand Down

0 comments on commit 6eee56c

Please sign in to comment.