diff --git a/Cargo.toml b/Cargo.toml index 583d64b9..d257cecc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,10 @@ keywords = ["actors", "actor-model", "async", "cqrs", "event_sourcing"] [badges] travis-ci = { repository = "riker-rs/riker" } +[features] +default = [] +tokio_executor = ["tokio"] + [dependencies] riker-macros = { path = "riker-macros", version = "0.2.0" } chrono = "0.4" @@ -27,8 +31,12 @@ slog-stdlog = "4.0" slog-scope = "4.3.0" num_cpus = "1.13.0" dashmap = "3" +tokio = { version = "^1", features = ["rt-multi-thread", "macros", "time"], optional = true } [dev-dependencies] -riker-testkit = "0.1.0" log = "0.4" + +#[dev-dependencies.riker-testkit] +#git = "https://github.com/mankinskin/riker-testkit" +#branch = "tokio_executor" diff --git a/riker-macros/Cargo.toml b/riker-macros/Cargo.toml index 6ffd84c6..d9111a7b 100644 --- a/riker-macros/Cargo.toml +++ b/riker-macros/Cargo.toml @@ -13,10 +13,18 @@ keywords = ["actors", "actor-model", "async", "cqrs", "event_sourcing"] [lib] proc-macro = true +[features] +default = [] + [dependencies] syn = { version ="1.0", features = ["parsing", "full", "extra-traits", "proc-macro"] } quote = "1.0" proc-macro2 = "1.0" +tokio = { version = "^1", features = ["rt-multi-thread", "macros", "time"], optional = true } [dev-dependencies] riker = { path = ".." } + +[dev-dependencies.riker-testkit] +git = "https://github.com/mankinskin/riker-testkit" +branch = "tokio_executor" diff --git a/riker-macros/tests/macro.rs b/riker-macros/tests/macro.rs index 4f5b4966..7e3da1b2 100644 --- a/riker-macros/tests/macro.rs +++ b/riker-macros/tests/macro.rs @@ -33,7 +33,7 @@ impl Receive for NewActor { } } -#[test] +#[riker_testkit::test] fn run_derived_actor() { let sys = ActorSystem::new().unwrap(); @@ -45,6 +45,9 @@ fn run_derived_actor() { // wait until all direct children of the user root are terminated while sys.user_root().has_children() { // in order to lower cpu usage, sleep here + #[cfg(feature = "tokio_executor")] + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + #[cfg(not(feature = "tokio_executor"))] std::thread::sleep(std::time::Duration::from_millis(50)); } } @@ -80,7 +83,7 @@ impl Receive for GenericActor> for GenericMsgActor { } } -#[test] +#[riker_testkit::test] fn run_generic_message_actor() { let sys = ActorSystem::new().unwrap(); @@ -145,6 +151,9 @@ fn run_generic_message_actor() { // wait until all direct children of the user root are terminated while sys.user_root().has_children() { // in order to lower cpu usage, sleep here + #[cfg(feature = "tokio_executor")] + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + #[cfg(not(feature = "tokio_executor"))] std::thread::sleep(std::time::Duration::from_millis(50)); } } @@ -202,7 +211,7 @@ impl Receive for PathMsgActor { } } -#[test] +#[riker_testkit::test] fn run_path_message_actor() { let sys = ActorSystem::new().unwrap(); @@ -219,6 +228,9 @@ fn run_path_message_actor() { // wait until all direct children of the user root are terminated while sys.user_root().has_children() { // in order to lower cpu usage, sleep here + #[cfg(feature = "tokio_executor")] + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + #[cfg(not(feature = "tokio_executor"))] std::thread::sleep(std::time::Duration::from_millis(50)); } } diff --git a/src/actor.rs b/src/actor.rs index b503439a..d8cb7b67 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -210,58 +210,63 @@ impl Actor for Box { /// attribute macro and implemented for each message type to receive. /// /// # Examples -/// -/// ``` -/// # use riker::actors::*; -/// -/// #[derive(Clone, Debug)] -/// pub struct Foo; -/// #[derive(Clone, Debug)] -/// pub struct Bar; -/// #[actor(Foo, Bar)] // <-- set our actor to receive Foo and Bar types -/// #[derive(Default)] -/// struct MyActor; -/// -/// impl Actor for MyActor { -/// type Msg = MyActorMsg; // <-- MyActorMsg is provided for us -/// -/// fn recv(&mut self, -/// ctx: &Context, -/// msg: Self::Msg, -/// sender: Sender) { -/// self.receive(ctx, msg, sender); // <-- call the respective implementation -/// } -/// } -/// -/// impl Receive for MyActor { -/// type Msg = MyActorMsg; -/// -/// fn receive(&mut self, -/// ctx: &Context, -/// msg: Foo, // <-- receive Foo -/// sender: Sender) { -/// println!("Received a Foo"); -/// } -/// } -/// -/// impl Receive for MyActor { -/// type Msg = MyActorMsg; -/// -/// fn receive(&mut self, -/// ctx: &Context, -/// msg: Bar, // <-- receive Bar -/// sender: Sender) { -/// println!("Received a Bar"); -/// } -/// } -/// -/// // main -/// let sys = ActorSystem::new().unwrap(); -/// let actor = sys.actor_of::("my-actor").unwrap(); -/// -/// actor.tell(Foo, None); -/// actor.tell(Bar, None); -/// ``` +#[cfg_attr( + not(feature = "tokio_executor"), + doc = r##" +``` +# use riker::actors::*; + +#[derive(Clone, Debug)] +pub struct Foo; +#[derive(Clone, Debug)] +pub struct Bar; +#[actor(Foo, Bar)] // <-- set our actor to receive Foo and Bar types +#[derive(Default)] +struct MyActor; + +impl Actor for MyActor { + type Msg = MyActorMsg; // <-- MyActorMsg is provided for us + + fn recv(&mut self, + ctx: &Context, + msg: Self::Msg, + sender: Sender) { + self.receive(ctx, msg, sender); // <-- call the respective implementation + } +} + +impl Receive for MyActor { + type Msg = MyActorMsg; + + fn receive(&mut self, + ctx: &Context, + msg: Foo, // <-- receive Foo + sender: Sender) { + println!("Received a Foo"); + } +} + +impl Receive for MyActor { + type Msg = MyActorMsg; + + fn receive(&mut self, + ctx: &Context, + msg: Bar, // <-- receive Bar + sender: Sender) { + println!("Received a Bar"); + } +} + +fn main() { + let sys = ActorSystem::new().unwrap(); + let actor = sys.actor_of::("my-actor").unwrap(); + + actor.tell(Foo, None); + actor.tell(Bar, None); +} +``` +"## +)] pub trait Receive { type Msg: Message; diff --git a/src/actor/actor_cell.rs b/src/actor/actor_cell.rs index ff541beb..dde28630 100644 --- a/src/actor/actor_cell.rs +++ b/src/actor/actor_cell.rs @@ -10,11 +10,12 @@ use std::{ use chrono::prelude::*; use dashmap::DashMap; -use futures::{future::RemoteHandle, task::SpawnError, Future}; +use futures::Future; use uuid::Uuid; use crate::{ actor::{props::ActorFactory, *}, + executor::TaskHandle, kernel::{ kernel_ref::{dispatch, dispatch_any, KernelRef}, mailbox::{AnyEnqueueError, AnySender, MailboxSender}, @@ -525,7 +526,10 @@ impl Run for Context where Msg: Message, { - fn run(&self, future: Fut) -> Result::Output>, SpawnError> + fn run( + &self, + future: Fut, + ) -> Result::Output>, Box> where Fut: Future + Send + 'static, ::Output: Send, diff --git a/src/actor/props.rs b/src/actor/props.rs index da9615d6..0e714427 100644 --- a/src/actor/props.rs +++ b/src/actor/props.rs @@ -17,33 +17,40 @@ use crate::actor::Actor; pub struct Props; impl Props { + #[cfg_attr( + not(feature = "tokio_executor"), + doc = r##" /// Creates an `ActorProducer` with no factory method parameters. /// /// # Examples /// - /// ``` - /// # use riker::actors::*; - /// - /// struct User; - /// - /// impl User { - /// fn actor() -> Self { - /// User - /// } - /// } - /// - /// # impl Actor for User { - /// # type Msg = String; - /// # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} - /// # } - /// // main - /// let sys = ActorSystem::new().unwrap(); - /// - /// let props = Props::new_from(User::actor); - /// - /// // start the actor and get an `ActorRef` - /// let actor = sys.actor_of_props("user", props).unwrap(); - /// ``` + ``` + # use riker::actors::*; + + struct User; + + impl User { + fn actor() -> Self { + User + } + } + + # impl Actor for User { + # type Msg = String; + # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} + # } + + fn main() { + let sys = ActorSystem::new().unwrap(); + + let props = Props::new_from(User::actor); + + // start the actor and get an `ActorRef` + let actor = sys.actor_of_props("user", props).unwrap(); + } + ``` + "## + )] #[inline] pub fn new_from(creator: F) -> Arc>> where @@ -53,67 +60,76 @@ impl Props { Arc::new(Mutex::new(ActorProps::new(creator))) } + #[cfg_attr( + not(feature = "tokio_executor"), + doc = r##" /// Creates an `ActorProducer` with one or more factory method parameters. /// /// # Examples /// An actor requiring a single parameter. - /// ``` - /// # use riker::actors::*; - /// - /// struct User { - /// name: String, - /// } - /// - /// impl User { - /// fn actor(name: String) -> Self { - /// User { - /// name - /// } - /// } - /// } - /// - /// # impl Actor for User { - /// # type Msg = String; - /// # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} - /// # } - /// // main - /// let sys = ActorSystem::new().unwrap(); - /// - /// let props = Props::new_from_args(User::actor, "Naomi Nagata".into()); - /// - /// let actor = sys.actor_of_props("user", props).unwrap(); - /// ``` + ``` + # use riker::actors::*; + + struct User { + name: String, + } + + impl User { + fn actor(name: String) -> Self { + User { + name + } + } + } + + # impl Actor for User { + # type Msg = String; + # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} + # } + + fn main() { + let sys = ActorSystem::new().unwrap(); + + let props = Props::new_from_args(User::actor, "Naomi Nagata".into()); + + let actor = sys.actor_of_props("user", props).unwrap(); + } + ``` /// An actor requiring multiple parameters. - /// ``` - /// # use riker::actors::*; - /// - /// struct BankAccount { - /// name: String, - /// number: String, - /// } - /// - /// impl BankAccount { - /// fn actor((name, number): (String, String)) -> Self { - /// BankAccount { - /// name, - /// number - /// } - /// } - /// } - /// - /// # impl Actor for BankAccount { - /// # type Msg = String; - /// # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} - /// # } - /// // main - /// let sys = ActorSystem::new().unwrap(); - /// - /// let props = Props::new_from_args(BankAccount::actor, - /// ("James Holden".into(), "12345678".into())); - /// - /// // start the actor and get an `ActorRef` - /// let actor = sys.actor_of_props("bank_account", props).unwrap(); - /// ``` + ``` + # use riker::actors::*; + + struct BankAccount { + name: String, + number: String, + } + + impl BankAccount { + fn actor((name, number): (String, String)) -> Self { + BankAccount { + name, + number + } + } + } + + # impl Actor for BankAccount { + # type Msg = String; + # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} + # } + + fn main() { + let sys = ActorSystem::new().unwrap(); + + let props = Props::new_from_args(BankAccount::actor, + ("James Holden".into(), "12345678".into())); + + // start the actor and get an `ActorRef` + let actor = sys.actor_of_props("bank_account", props).unwrap(); + } + ``` + "## + )] #[inline] pub fn new_from_args( creator: F, @@ -127,55 +143,64 @@ impl Props { Arc::new(Mutex::new(ActorPropsWithArgs::new(creator, args))) } + #[cfg_attr( + not(feature = "tokio_executor"), + doc = r##" /// Creates an `ActorProducer` from default constructible type with no factory method parameters. /// /// # Examples /// - /// ``` - /// # use riker::actors::*; - /// - /// #[derive(Default)] - /// struct User; - /// - /// # impl Actor for User { - /// # type Msg = String; - /// # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} - /// # } - /// // main - /// let sys = ActorSystem::new().unwrap(); - /// - /// let props = Props::new::(); - /// - /// // start the actor and get an `ActorRef` - /// let actor = sys.actor_of_props("user", props).unwrap(); - /// ``` - /// Creates an `ActorProducer` from a type which implements ActorFactory with no factory method parameters. - /// - /// # Examples - /// - /// ``` - /// # use riker::actors::*; - /// - /// struct User; - /// - /// impl ActorFactory for User { - /// fn create() -> Self { - /// User - /// } - /// } - /// - /// # impl Actor for User { - /// # type Msg = String; - /// # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} - /// # } - /// // main - /// let sys = ActorSystem::new().unwrap(); - /// - /// let props = Props::new::(); - /// - /// // start the actor and get an `ActorRef` - /// let actor = sys.actor_of_props("user", props).unwrap(); - /// ``` + ``` + # use riker::actors::*; + + #[derive(Default)] + struct User; + + # impl Actor for User { + # type Msg = String; + # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} + # } + + fn main() { + let sys = ActorSystem::new().unwrap(); + + let props = Props::new::(); + + // start the actor and get an `ActorRef` + let actor = sys.actor_of_props("user", props).unwrap(); + } + ``` + Creates an `ActorProducer` from a type which implements ActorFactory with no factory method parameters. + + # Examples + + ``` + # use riker::actors::*; + + struct User; + + impl ActorFactory for User { + fn create() -> Self { + User + } + } + + # impl Actor for User { + # type Msg = String; + # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} + # } + + fn main() { + let sys = ActorSystem::new().unwrap(); + + let props = Props::new::(); + + // start the actor and get an `ActorRef` + let actor = sys.actor_of_props("user", props).unwrap(); + } + ``` + "## + )] #[inline] pub fn new() -> Arc>> where @@ -184,67 +209,76 @@ impl Props { Self::new_from(A::create) } + #[cfg_attr( + not(feature = "tokio_executor"), + doc = r##" /// Creates an `ActorProducer` from a type which implements ActorFactoryArgs with one or more factory method parameters. /// /// # Examples /// An actor requiring a single parameter. - /// ``` - /// # use riker::actors::*; - /// - /// struct User { - /// name: String, - /// } - /// - /// impl ActorFactoryArgs for User { - /// fn create_args(name: String) -> Self { - /// User { - /// name - /// } - /// } - /// } - /// - /// # impl Actor for User { - /// # type Msg = String; - /// # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} - /// # } - /// // main - /// let sys = ActorSystem::new().unwrap(); - /// - /// let props = Props::new_args::("Naomi Nagata".into()); - /// - /// let actor = sys.actor_of_props("user", props).unwrap(); - /// ``` - /// An actor requiring multiple parameters. - /// ``` - /// # use riker::actors::*; - /// - /// struct BankAccount { - /// name: String, - /// number: String, - /// } - /// - /// impl ActorFactoryArgs<(String, String)> for BankAccount { - /// fn create_args((name, number): (String, String)) -> Self { - /// BankAccount { - /// name, - /// number - /// } - /// } - /// } - /// - /// # impl Actor for BankAccount { - /// # type Msg = String; - /// # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} - /// # } - /// // main - /// let sys = ActorSystem::new().unwrap(); - /// - /// let props = Props::new_from_args(BankAccount::create_args, - /// ("James Holden".into(), "12345678".into())); - /// - /// // start the actor and get an `ActorRef` - /// let actor = sys.actor_of_props("bank_account", props).unwrap(); - /// ``` + ``` + # use riker::actors::*; + + struct User { + name: String, + } + + impl ActorFactoryArgs for User { + fn create_args(name: String) -> Self { + User { + name + } + } + } + + # impl Actor for User { + # type Msg = String; + # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} + # } + + fn main() { + let sys = ActorSystem::new().unwrap(); + + let props = Props::new_args::("Naomi Nagata".into()); + + let actor = sys.actor_of_props("user", props).unwrap(); + } + ``` + An actor requiring multiple parameters. + ``` + # use riker::actors::*; + + struct BankAccount { + name: String, + number: String, + } + + impl ActorFactoryArgs<(String, String)> for BankAccount { + fn create_args((name, number): (String, String)) -> Self { + BankAccount { + name, + number + } + } + } + + # impl Actor for BankAccount { + # type Msg = String; + # fn recv(&mut self, _ctx: &Context, _msg: String, _sender: Sender) {} + # } + + fn main() { + let sys = ActorSystem::new().unwrap(); + + let props = Props::new_from_args(BankAccount::create_args, + ("James Holden".into(), "12345678".into())); + + // start the actor and get an `ActorRef` + let actor = sys.actor_of_props("bank_account", props).unwrap(); + } + ``` + "## + )] #[inline] pub fn new_args(args: Args) -> Arc>> where diff --git a/src/executor.rs b/src/executor.rs new file mode 100644 index 00000000..463c5eab --- /dev/null +++ b/src/executor.rs @@ -0,0 +1,137 @@ +use config::Config; +use futures::{ + channel::oneshot::Receiver, + task::{Context as PollContext, Poll}, + Future, +}; +use std::{error::Error, pin::Pin, sync::Arc}; + +pub type ExecutorHandle = Arc; + +pub trait Task: Future + Send {} +impl + Send> Task for T {} + +pub trait TaskExecutor { + fn spawn(&self, future: Pin>) -> Result>, Box>; +} +pub trait TaskExec: + Future>> + Unpin + Send + Sync +{ + fn abort(self: Box); + fn forget(self: Box); +} +pub struct TaskHandle { + handle: Box>, + recv: Receiver, +} +impl TaskHandle { + pub fn new(handle: Box>, recv: Receiver) -> Self { + Self { handle, recv } + } +} +impl Future for TaskHandle { + type Output = Result>; + fn poll(mut self: Pin<&mut Self>, cx: &mut PollContext<'_>) -> Poll { + if Pin::new(&mut *self.handle).poll(cx).is_ready() { + if let Poll::Ready(val) = as Future>::poll(Pin::new(&mut self.recv), cx) { + self.recv.close(); + return Poll::Ready(val.map_err(|e| Box::new(e) as Box)); + } + } + Poll::Pending + } +} +impl TaskHandle { + pub fn abort(self) { + self.handle.abort() + } + pub fn forget(self) { + self.handle.forget() + } +} +impl TaskExec for TaskHandle { + fn abort(self: Box) { + self.handle.abort() + } + fn forget(self: Box) { + self.handle.forget() + } +} + +pub use executor_impl::*; +#[cfg(feature = "tokio_executor")] +mod executor_impl { + pub fn get_executor_handle(_: &Config) -> ExecutorHandle { + Arc::new(TokioExecutor(tokio::runtime::Handle::current())) + } + use super::*; + pub struct TokioExecutor(pub tokio::runtime::Handle); + impl TaskExecutor for TokioExecutor { + fn spawn( + &self, + future: Pin>, + ) -> Result>, Box> { + Ok(Box::new(TokioJoinHandle(self.0.spawn(future)))) + } + } + struct TokioJoinHandle(tokio::task::JoinHandle<()>); + impl Future for TokioJoinHandle { + type Output = Result<(), Box>; + fn poll(mut self: Pin<&mut Self>, cx: &mut PollContext<'_>) -> Poll { + Future::poll(Pin::new(&mut self.0), cx) + .map_err(|e| Box::new(e) as Box) + } + } + impl TaskExec<()> for TokioJoinHandle { + fn abort(self: Box) { + self.0.abort(); + } + fn forget(self: Box) { + drop(self); + } + } +} + +#[cfg(not(feature = "tokio_executor"))] +mod executor_impl { + use super::*; + use crate::system::ThreadPoolConfig; + use futures::task::SpawnExt; + pub fn get_executor_handle(cfg: &Config) -> ExecutorHandle { + let exec_cfg = ThreadPoolConfig::from(cfg); + let pool = futures::executor::ThreadPoolBuilder::new() + .pool_size(exec_cfg.pool_size) + .stack_size(exec_cfg.stack_size) + .name_prefix("pool-thread-#") + .create() + .unwrap(); + Arc::new(FuturesExecutor(pool)) + } + pub struct FuturesExecutor(pub futures::executor::ThreadPool); + impl TaskExecutor for FuturesExecutor { + fn spawn( + &self, + future: Pin>, + ) -> Result>, Box> { + self.0 + .spawn_with_handle(future) + .map(|h| Box::new(FuturesJoinHandle(h)) as Box>) + .map_err(|e| Box::new(e) as Box) + } + } + struct FuturesJoinHandle(futures::future::RemoteHandle<()>); + impl Future for FuturesJoinHandle { + type Output = Result<(), Box>; + fn poll(mut self: Pin<&mut Self>, cx: &mut PollContext<'_>) -> Poll { + Future::poll(Pin::new(&mut self.0), cx).map(|_| Ok(()) as Result<(), Box>) + } + } + impl TaskExec<()> for FuturesJoinHandle { + fn abort(self: Box) { + drop(self) + } + fn forget(self: Box) { + self.0.forget() + } + } +} diff --git a/src/kernel.rs b/src/kernel.rs index 1150f4a8..117bbcb0 100644 --- a/src/kernel.rs +++ b/src/kernel.rs @@ -18,7 +18,7 @@ use std::{ sync::{Arc, Mutex}, }; -use futures::{channel::mpsc::channel, task::SpawnExt, StreamExt}; +use futures::{channel::mpsc::channel, StreamExt}; use slog::warn; use crate::{ @@ -28,7 +28,7 @@ use crate::{ kernel_ref::KernelRef, mailbox::{flush_to_deadletters, run_mailbox, Mailbox}, }, - system::{ActorRestarted, ActorTerminated, SystemMsg}, + system::{ActorRestarted, ActorTerminated, Run, SystemMsg}, Message, }; @@ -98,7 +98,7 @@ where } }; - sys.exec.spawn(f).unwrap(); + sys.run(f).unwrap().forget(); Ok(kr) } diff --git a/src/kernel/kernel_ref.rs b/src/kernel/kernel_ref.rs index 6c235c46..715b6fc9 100644 --- a/src/kernel/kernel_ref.rs +++ b/src/kernel/kernel_ref.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use futures::{channel::mpsc::Sender, task::SpawnExt, SinkExt}; +use futures::{channel::mpsc::Sender, SinkExt}; use crate::{ actor::{MsgError, MsgResult}, @@ -8,7 +8,7 @@ use crate::{ mailbox::{AnyEnqueueError, AnySender, MailboxSchedule, MailboxSender}, KernelMsg, }, - system::ActorSystem, + system::{ActorSystem, Run}, AnyMessage, Envelope, Message, }; @@ -36,11 +36,11 @@ impl KernelRef { fn send(&self, msg: KernelMsg, sys: &ActorSystem) { let mut tx = self.tx.clone(); - sys.exec - .spawn(async move { - drop(tx.send(msg).await); - }) - .unwrap(); + sys.run(async move { + drop(tx.send(msg).await); + }) + .unwrap() + .forget(); } } diff --git a/src/lib.rs b/src/lib.rs index b3a81be1..ebddd2d2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ mod validate; pub mod actor; +pub mod executor; pub mod kernel; pub mod system; diff --git a/src/system.rs b/src/system.rs index 72e9a060..41ccb20a 100644 --- a/src/system.rs +++ b/src/system.rs @@ -139,13 +139,7 @@ use std::{ use chrono::prelude::*; use config::Config; -use futures::{ - channel::oneshot, - executor::{ThreadPool, ThreadPoolBuilder}, - future::RemoteHandle, - task::{SpawnError, SpawnExt}, - Future, -}; +use futures::{channel::oneshot, Future, FutureExt}; use uuid::Uuid; @@ -177,7 +171,7 @@ pub struct SystemBuilder { name: Option, cfg: Option, log: Option, - exec: Option, + exec: Option, } impl SystemBuilder { @@ -211,7 +205,7 @@ impl SystemBuilder { } } - pub fn exec(self, exec: ThreadPool) -> Self { + pub fn exec(self, exec: ExecutorHandle) -> Self { SystemBuilder { exec: Some(exec), ..self @@ -252,6 +246,10 @@ impl Deref for LoggingSystem { } } +use crate::executor::{get_executor_handle, ExecutorHandle, TaskExecutor, TaskHandle}; +pub fn default_exec(cfg: &Config) -> ExecutorHandle { + get_executor_handle(cfg) +} /// The actor runtime and common services coordinator /// /// The `ActorSystem` provides a runtime on which actors are executed. @@ -266,12 +264,11 @@ pub struct ActorSystem { sys_actors: Option, log: LoggingSystem, debug: bool, - pub exec: ThreadPool, + pub exec: ExecutorHandle, pub timer: TimerRef, pub sys_channels: Option, pub(crate) provider: Provider, } - impl ActorSystem { /// Create a new `ActorSystem` instance /// @@ -283,6 +280,15 @@ impl ActorSystem { ActorSystem::create("riker", exec, log, cfg) } + /// Create a new `ActorSystem` instance with provided executor + /// + /// Requires a type that implements the `TaskExecutor` trait. + pub fn with_executor(exec: impl TaskExecutor + 'static) -> Result { + let cfg = load_config(); + let log = default_log(&cfg); + + ActorSystem::create("riker", Arc::new(exec), log, cfg) + } /// Create a new `ActorSystem` instance with provided name /// @@ -305,7 +311,7 @@ impl ActorSystem { fn create( name: &str, - exec: ThreadPool, + exec: ExecutorHandle, log: LoggingSystem, cfg: Config, ) -> Result { @@ -663,22 +669,30 @@ impl ActorSelectionFactory for ActorSystem { } } +use std::error::Error; // futures::task::Spawn::spawn requires &mut self so // we'll create a wrapper trait that requires only &self. pub trait Run { - fn run(&self, future: Fut) -> Result::Output>, SpawnError> + fn run(&self, future: Fut) -> Result::Output>, Box> where Fut: Future + Send + 'static, ::Output: Send; } impl Run for ActorSystem { - fn run(&self, future: Fut) -> Result::Output>, SpawnError> + fn run(&self, future: Fut) -> Result::Output>, Box> where Fut: Future + Send + 'static, ::Output: Send, { - self.exec.spawn_with_handle(future) + let (sender, recv) = futures::channel::oneshot::channel::(); + let handle = self.exec.spawn(Box::pin( + async move { + drop(sender.send(future.await)); + } + .boxed(), + ))?; + Ok(TaskHandle::new(handle, recv)) } } @@ -853,9 +867,9 @@ impl<'a> From<&'a Config> for SystemSettings { } } -struct ThreadPoolConfig { - pool_size: usize, - stack_size: usize, +pub(crate) struct ThreadPoolConfig { + pub pool_size: usize, + pub stack_size: usize, } impl<'a> From<&'a Config> for ThreadPoolConfig { @@ -867,16 +881,6 @@ impl<'a> From<&'a Config> for ThreadPoolConfig { } } -fn default_exec(cfg: &Config) -> ThreadPool { - let exec_cfg = ThreadPoolConfig::from(cfg); - ThreadPoolBuilder::new() - .pool_size(exec_cfg.pool_size) - .stack_size(exec_cfg.stack_size) - .name_prefix("pool-thread-#") - .create() - .unwrap() -} - #[derive(Clone)] pub struct SysActors { pub root: BasicActorRef, diff --git a/tests/actors.rs b/tests/actors.rs index 4f956a89..61cdf76f 100644 --- a/tests/actors.rs +++ b/tests/actors.rs @@ -1,10 +1,12 @@ -#[macro_use] -extern crate riker_testkit; - use riker::actors::*; -use riker_testkit::probe::channel::{probe, ChannelProbe}; -use riker_testkit::probe::{Probe, ProbeReceive}; +use riker_testkit::{ + p_assert_eq, + probe::{ + channel::{probe, ChannelProbe}, + Probe, ProbeReceive, + }, +}; #[derive(Clone, Debug)] pub struct Add; @@ -42,17 +44,16 @@ impl Receive for Counter { fn receive(&mut self, _ctx: &Context, _msg: Add, _sender: Sender) { self.count += 1; if self.count == 1_000_000 { - self.probe.as_ref().unwrap().0.event(()) + self.probe.as_ref().unwrap().0.event(()); } } } -#[test] +#[riker_testkit::test] fn actor_create() { let sys = ActorSystem::new().unwrap(); assert!(sys.actor_of::("valid-name").is_ok()); - match sys.actor_of::("/") { Ok(_) => panic!("test should not reach here"), Err(e) => { @@ -78,30 +79,29 @@ fn actor_create() { assert!(sys.actor_of::("!").is_err()); } -#[test] +#[riker_testkit::test] fn actor_tell() { let sys = ActorSystem::new().unwrap(); let actor = sys.actor_of::("me").unwrap(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); actor.tell(TestProbe(probe), None); for _ in 0..1_000_000 { actor.tell(Add, None); } - p_assert_eq!(listen, ()); } -#[test] +#[riker_testkit::test] fn actor_try_tell() { let sys = ActorSystem::new().unwrap(); let actor = sys.actor_of::("me").unwrap(); let actor: BasicActorRef = actor.into(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); actor .try_tell(CounterMsg::TestProbe(TestProbe(probe)), None) .unwrap(); @@ -155,18 +155,20 @@ impl Actor for Child { fn recv(&mut self, _: &Context, _: Self::Msg, _: Sender) {} } -#[test] -#[allow(dead_code)] +#[riker_testkit::test] fn actor_stop() { let system = ActorSystem::new().unwrap(); let parent = system.actor_of::("parent").unwrap(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); parent.tell(TestProbe(probe), None); system.print_tree(); // wait for the probe to arrive at the actor before attempting to stop the actor + #[cfg(feature = "tokio_executor")] + listen.recv().await; + #[cfg(not(feature = "tokio_executor"))] listen.recv(); system.stop(&parent); diff --git a/tests/channels.rs b/tests/channels.rs index 8982209d..ee2fdf84 100644 --- a/tests/channels.rs +++ b/tests/channels.rs @@ -1,10 +1,12 @@ -#[macro_use] -extern crate riker_testkit; - use riker::actors::*; -use riker_testkit::probe::channel::{probe, ChannelProbe}; -use riker_testkit::probe::{Probe, ProbeReceive}; +use riker_testkit::{ + p_assert_eq, + probe::{ + channel::{probe, ChannelProbe}, + Probe, ProbeReceive, + }, +}; #[derive(Clone, Debug)] pub struct TestProbe(ChannelProbe<(), ()>); @@ -66,7 +68,7 @@ impl Receive for Subscriber { } } -#[test] +#[riker_testkit::test] fn channel_publish() { let sys = ActorSystem::new().unwrap(); @@ -80,10 +82,14 @@ fn channel_publish() { .actor_of_args::("sub-actor", (chan.clone(), topic.clone())) .unwrap(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); + sub.tell(TestProbe(probe), None); // wait for the probe to arrive at the actor before publishing message + #[cfg(feature = "tokio_executor")] + listen.recv().await; + #[cfg(not(feature = "tokio_executor"))] listen.recv(); // Publish a test message @@ -98,7 +104,7 @@ fn channel_publish() { p_assert_eq!(listen, ()); } -#[test] +#[riker_testkit::test] fn channel_publish_subscribe_all() { let sys = ActorSystem::new().unwrap(); @@ -112,10 +118,14 @@ fn channel_publish_subscribe_all() { .actor_of_args::("sub-actor", (chan.clone(), topic)) .unwrap(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); + sub.tell(TestProbe(probe), None); // wait for the probe to arrive at the actor before publishing message + #[cfg(feature = "tokio_executor")] + listen.recv().await; + #[cfg(not(feature = "tokio_executor"))] listen.recv(); // Publish a test message to topic "topic-1" @@ -237,34 +247,38 @@ impl Receive for EventSubscriber { match msg { SystemEvent::ActorCreated(created) => { if created.actor.path() == "/user/dumb-actor" { - self.probe.as_ref().unwrap().0.event(()) + self.probe.as_ref().unwrap().0.event(()); } } SystemEvent::ActorRestarted(restarted) => { if restarted.actor.path() == "/user/dumb-actor" { - self.probe.as_ref().unwrap().0.event(()) + self.probe.as_ref().unwrap().0.event(()); } } SystemEvent::ActorTerminated(terminated) => { if terminated.actor.path() == "/user/dumb-actor" { - self.probe.as_ref().unwrap().0.event(()) + self.probe.as_ref().unwrap().0.event(()); } } } } } -#[test] +#[riker_testkit::test] fn channel_system_events() { let sys = ActorSystem::new().unwrap(); let actor = sys.actor_of::("event-sub").unwrap(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); + actor.tell(TestProbe(probe), None); // wait for the probe to arrive at the actor before attempting // create, restart and stop + #[cfg(feature = "tokio_executor")] + listen.recv().await; + #[cfg(not(feature = "tokio_executor"))] listen.recv(); // Create an actor @@ -327,21 +341,28 @@ impl Receive for DeadLetterSub { } } -#[test] +#[riker_testkit::test] fn channel_dead_letters() { let sys = ActorSystem::new().unwrap(); let actor = sys.actor_of::("dl-subscriber").unwrap(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); + actor.tell(TestProbe(probe), None); // wait for the probe to arrive at the actor before attempting to stop the actor + #[cfg(feature = "tokio_executor")] + listen.recv().await; + #[cfg(not(feature = "tokio_executor"))] listen.recv(); let dumb = sys.actor_of::("dumb-actor").unwrap(); // immediately stop the actor and attempt to send a message sys.stop(&dumb); + #[cfg(feature = "tokio_executor")] + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + #[cfg(not(feature = "tokio_executor"))] std::thread::sleep(std::time::Duration::from_secs(1)); dumb.tell(SomeMessage, None); diff --git a/tests/logger.rs b/tests/logger.rs index 89f15f0a..8b7e42e6 100644 --- a/tests/logger.rs +++ b/tests/logger.rs @@ -1,8 +1,9 @@ -use futures::executor::block_on; - use riker::actors::*; use slog::{o, Fuse, Logger}; +#[cfg(not(feature = "tokio_executor"))] +use futures::executor::block_on; + mod common { use std::{fmt, result}; @@ -42,18 +43,22 @@ mod common { } } -#[test] +#[riker_testkit::test] fn system_create_with_slog() { let log = Logger::root( Fuse(common::PrintlnDrain), o!("version" => "v1", "run_env" => "test"), ); let sys = SystemBuilder::new().log(log).create().unwrap(); + + #[cfg(feature = "tokio_executor")] + sys.shutdown().await.unwrap(); + #[cfg(not(feature = "tokio_executor"))] block_on(sys.shutdown()).unwrap(); } // a test that logging without slog using "log" crate works -#[test] +#[riker_testkit::test] fn logging_stdlog() { log::info!("before the system"); let _sys = ActorSystem::new().unwrap(); diff --git a/tests/scheduling.rs b/tests/scheduling.rs index fe6338ba..d852368c 100644 --- a/tests/scheduling.rs +++ b/tests/scheduling.rs @@ -1,8 +1,6 @@ -#[macro_use] -extern crate riker_testkit; - use riker::actors::*; +use riker_testkit::p_assert_eq; use riker_testkit::probe::channel::{probe, ChannelProbe}; use riker_testkit::probe::{Probe, ProbeReceive}; @@ -47,26 +45,26 @@ impl Receive for ScheduleOnce { } } -#[test] +#[riker_testkit::test] fn schedule_once() { let sys = ActorSystem::new().unwrap(); let actor = sys.actor_of::("schedule-once").unwrap(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); // use scheduler to set up probe sys.schedule_once(Duration::from_millis(200), actor, None, TestProbe(probe)); p_assert_eq!(listen, ()); } -#[test] +#[riker_testkit::test] fn schedule_at_time() { let sys = ActorSystem::new().unwrap(); let actor = sys.actor_of::("schedule-once").unwrap(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); // use scheduler to set up probe at a specific time let schedule_at = Utc::now() + CDuration::milliseconds(200); @@ -122,13 +120,13 @@ impl Receive for ScheduleRepeat { } } -#[test] +#[riker_testkit::test] fn schedule_repeat() { let sys = ActorSystem::new().unwrap(); let actor = sys.actor_of::("schedule-repeat").unwrap(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); actor.tell(TestProbe(probe), None); diff --git a/tests/selection.rs b/tests/selection.rs index 981e92b5..e51fd6ea 100644 --- a/tests/selection.rs +++ b/tests/selection.rs @@ -1,8 +1,6 @@ -#[macro_use] -extern crate riker_testkit; - use riker::actors::*; +use riker_testkit::p_assert_eq; use riker_testkit::probe::channel::{probe, ChannelProbe}; use riker_testkit::probe::{Probe, ProbeReceive}; @@ -41,13 +39,13 @@ impl Actor for SelectTest { } } -#[test] +#[riker_testkit::test] fn select_child() { let sys = ActorSystem::new().unwrap(); sys.actor_of::("select-actor").unwrap(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); // select test actors through actor selection: /root/user/select-actor/* let sel = sys.select("select-actor").unwrap(); @@ -57,7 +55,7 @@ fn select_child() { p_assert_eq!(listen, ()); } -#[test] +#[riker_testkit::test] fn select_child_of_child() { let sys = ActorSystem::new().unwrap(); @@ -65,9 +63,12 @@ fn select_child_of_child() { // delay to allow 'select-actor' pre_start to create 'child_a' and 'child_b' // Direct messaging on the actor_ref doesn't have this same issue + #[cfg(feature = "tokio_executor")] + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + #[cfg(not(feature = "tokio_executor"))] std::thread::sleep(std::time::Duration::from_millis(500)); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); // select test actors through actor selection: /root/user/select-actor/* let sel = sys.select("select-actor/child_a").unwrap(); @@ -77,7 +78,7 @@ fn select_child_of_child() { p_assert_eq!(listen, ()); } -#[test] +#[riker_testkit::test] fn select_all_children_of_child() { let sys = ActorSystem::new().unwrap(); @@ -85,9 +86,12 @@ fn select_all_children_of_child() { // delay to allow 'select-actor' pre_start to create 'child_a' and 'child_b' // Direct messaging on the actor_ref doesn't have this same issue + #[cfg(feature = "tokio_executor")] + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + #[cfg(not(feature = "tokio_executor"))] std::thread::sleep(std::time::Duration::from_millis(500)); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); // select relative test actors through actor selection: /root/user/select-actor/* let sel = sys.select("select-actor/*").unwrap(); @@ -143,13 +147,14 @@ impl Actor for SelectTest2 { } } -#[test] +#[riker_testkit::test] fn select_from_context() { let sys = ActorSystem::new().unwrap(); let actor = sys.actor_of::("select-actor").unwrap(); - let (probe, listen) = probe(); + let (probe, mut listen) = probe(); + actor.tell(TestProbe(probe), None); // seven events back expected: @@ -162,7 +167,7 @@ fn select_from_context() { p_assert_eq!(listen, ()); } -#[test] +#[riker_testkit::test] fn select_paths() { let sys = ActorSystem::new().unwrap(); diff --git a/tests/supervision.rs b/tests/supervision.rs index e0d2564b..9aeb9277 100644 --- a/tests/supervision.rs +++ b/tests/supervision.rs @@ -1,8 +1,6 @@ -#[macro_use] -extern crate riker_testkit; - use riker::actors::*; +use riker_testkit::p_assert_eq; use riker_testkit::probe::channel::{probe, ChannelProbe}; use riker_testkit::probe::{Probe, ProbeReceive}; @@ -98,7 +96,7 @@ impl Receive for RestartSup { } } -#[test] +#[riker_testkit::test] fn supervision_restart_failed_actor() { let sys = ActorSystem::new().unwrap(); @@ -110,7 +108,8 @@ fn supervision_restart_failed_actor() { // Make the test actor panic sup.tell(Panic, None); - let (probe, listen) = probe::<()>(); + let (probe, mut listen) = probe::<()>(); + sup.tell(TestProbe(probe), None); p_assert_eq!(listen, ()); } @@ -203,7 +202,7 @@ impl Receive for EscRestartSup { } } -#[test] +#[riker_testkit::test] fn supervision_escalate_failed_actor() { let sys = ActorSystem::new().unwrap(); @@ -212,8 +211,13 @@ fn supervision_escalate_failed_actor() { // Make the test actor panic sup.tell(Panic, None); - let (probe, listen) = probe::<()>(); + let (probe, mut listen) = probe::<()>(); + + #[cfg(feature = "tokio_executor")] + tokio::time::sleep(std::time::Duration::from_millis(2000)).await; + #[cfg(not(feature = "tokio_executor"))] std::thread::sleep(std::time::Duration::from_millis(2000)); + sup.tell(TestProbe(probe), None); p_assert_eq!(listen, ()); sys.print_tree(); diff --git a/tests/system.rs b/tests/system.rs index 702c8cf3..ab1882e2 100644 --- a/tests/system.rs +++ b/tests/system.rs @@ -1,7 +1,9 @@ -use futures::executor::block_on; use riker::actors::*; -#[test] +#[cfg(not(feature = "tokio_executor"))] +use futures::executor::block_on; + +#[riker_testkit::test] fn system_create() { assert!(ActorSystem::new().is_ok()); assert!(ActorSystem::with_name("valid-name").is_ok()); @@ -40,8 +42,7 @@ impl Actor for ShutdownTest { fn recv(&mut self, _: &Context, _: Self::Msg, _: Sender) {} } -#[test] -#[allow(dead_code)] +#[riker_testkit::test] fn system_shutdown() { let sys = ActorSystem::new().unwrap(); @@ -49,21 +50,27 @@ fn system_shutdown() { .actor_of_args::("test-actor-1", 1) .unwrap(); + #[cfg(feature = "tokio_executor")] + sys.shutdown().await.unwrap(); + #[cfg(not(feature = "tokio_executor"))] block_on(sys.shutdown()).unwrap(); } -#[test] +#[riker_testkit::test] fn system_futures_exec() { let sys = ActorSystem::new().unwrap(); for i in 0..100 { let f = sys.run(async move { format!("some_val_{}", i) }).unwrap(); - - assert_eq!(block_on(f), format!("some_val_{}", i)); + #[cfg(feature = "tokio_executor")] + let result = f.await; + #[cfg(not(feature = "tokio_executor"))] + let result = block_on(f); + assert_eq!(result.unwrap(), format!("some_val_{}", i)); } } -#[test] +#[riker_testkit::test] fn system_futures_panic() { let sys = ActorSystem::new().unwrap(); @@ -77,23 +84,34 @@ fn system_futures_panic() { for i in 0..100 { let f = sys.run(async move { format!("some_val_{}", i) }).unwrap(); - - assert_eq!(block_on(f), format!("some_val_{}", i)); + #[cfg(feature = "tokio_executor")] + let result = f.await; + #[cfg(not(feature = "tokio_executor"))] + let result = block_on(f); + assert_eq!(result.unwrap(), format!("some_val_{}", i)); } } -#[test] +#[riker_testkit::test] fn system_load_app_config() { let sys = ActorSystem::new().unwrap(); assert_eq!(sys.config().get_int("app.some_setting").unwrap() as i64, 1); } -#[test] +#[riker_testkit::test] fn system_builder() { let sys = SystemBuilder::new().create().unwrap(); + + #[cfg(feature = "tokio_executor")] + sys.shutdown().await.unwrap(); + #[cfg(not(feature = "tokio_executor"))] block_on(sys.shutdown()).unwrap(); let sys = SystemBuilder::new().name("my-sys").create().unwrap(); + + #[cfg(feature = "tokio_executor")] + sys.shutdown().await.unwrap(); + #[cfg(not(feature = "tokio_executor"))] block_on(sys.shutdown()).unwrap(); }