Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tokio executor #1

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/target
Cargo.lock
**/target
**/*.rs.bk
.vscode
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
[package]
name = "riker-testkit"
version = "0.1.0"
edition = "2018"
authors = ["Lee Smith <[email protected]>"]
description = "Tools to make testing Riker applications easier"
homepage = "http://riker.rs/logging"
license = "MIT"
readme = "README.md"

[features]
default = []
tokio_executor = ["tokio", "async-trait", "test_fn_macro/tokio_executor"]
[dependencies]
chrono = "0.4"
tokio = { version = "^1", features = ["rt-multi-thread", "macros", "sync"], optional = true}
async-trait = { version="0.1.42", optional = true}
test_fn_macro = { path = "./test_fn_macro" }
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
extern crate chrono;

pub use test_fn_macro::test;
pub mod probe;
143 changes: 87 additions & 56 deletions src/probe.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@

pub trait Probe {
type Msg: Send;
type Pay: Clone + Send;
type Pay: Clone + Send + Sync;

fn event(&self, evt: Self::Msg);
fn payload(&self) -> &Self::Pay;
}

#[cfg_attr(feature = "tokio_executor", async_trait::async_trait)]
pub trait ProbeReceive {
type Msg: Send;

fn recv(&self) -> Self::Msg;
#[cfg(feature = "tokio_executor")]
async fn recv(&mut self) -> Self::Msg;
#[cfg(not(feature = "tokio_executor"))]
fn recv(&mut self) -> Self::Msg;
fn reset_timer(&mut self);
fn last_event_milliseconds(&self) -> u64;
fn last_event_seconds(&self) -> u64;
Expand All @@ -23,13 +26,28 @@ pub mod channel {
use super::{Probe, ProbeReceive};

use chrono::prelude::*;
use std::sync::mpsc::{channel, Sender, Receiver};

#[cfg(feature = "tokio_executor")]
use tokio::sync::mpsc::{
channel,
Sender,
Receiver,
};
#[cfg(not(feature = "tokio_executor"))]
use std::sync::mpsc::{
channel,
Sender,
Receiver,
};

pub fn probe<T: Send>() -> (ChannelProbe<(), T>, ChannelProbeReceive<T>) {
probe_with_payload(())
}

pub fn probe_with_payload<P: Clone + Send, T: Send>(payload: P) -> (ChannelProbe<P, T>, ChannelProbeReceive<T>) {
#[cfg(feature = "tokio_executor")]
let (tx, rx) = channel::<T>(100);
#[cfg(not(feature = "tokio_executor"))]
let (tx, rx) = channel::<T>();

let probe = ChannelProbe {
Expand All @@ -52,31 +70,37 @@ pub mod channel {
tx: Sender<T>,
}

impl<P, T> Probe for ChannelProbe<P, T>
where P: Clone + Send, T: Send {
impl<P, T: std::fmt::Debug + 'static> Probe for ChannelProbe<P, T>
where P: Clone + Send + Sync, T: Send {
type Msg = T;
type Pay = P;

fn event(&self, evt: T) {
drop(self.tx.send(evt));
let tx = self.clone().tx.clone();
#[cfg(feature = "tokio_executor")]
tokio::spawn(async move {
tx.send(evt).await.unwrap();
});
#[cfg(not(feature = "tokio_executor"))]
drop(tx.send(evt));
}

fn payload(&self) -> &P {
&self.payload.as_ref().unwrap()
}
}

impl<P, T> Probe for Option<ChannelProbe<P, T>>
where P: Clone + Send, T: Send {
impl<P, T: std::fmt::Debug + 'static> Probe for Option<ChannelProbe<P, T>>
where P: Clone + Send + Sync, T: Send {
type Msg = T;
type Pay = P;

fn event(&self, evt: T) {
drop(self.as_ref().unwrap().tx.send(evt));
self.as_ref().unwrap().event(evt);
}

fn payload(&self) -> &P {
&self.as_ref().unwrap().payload.as_ref().unwrap()
self.as_ref().unwrap().payload()
}
}

Expand All @@ -87,10 +111,16 @@ pub mod channel {
timer_start: DateTime<Utc>,
}

#[cfg_attr(feature = "tokio_executor", async_trait::async_trait)]
impl<T: Send> ProbeReceive for ChannelProbeReceive<T> {
type Msg = T;

fn recv(&self) -> T {
#[cfg(feature = "tokio_executor")]
async fn recv(&mut self) -> T {
self.rx.recv().await.unwrap()
}
#[cfg(not(feature = "tokio_executor"))]
fn recv(&mut self) -> T {
self.rx.recv().unwrap()
}

Expand All @@ -110,50 +140,18 @@ pub mod channel {
}
}

#[cfg(test)]
mod tests {
use super::{Probe, ProbeReceive};
use super::channel::{probe, probe_with_payload};
use std::thread;

#[test]
fn chan_probe() {
let (probe, listen) = probe();

thread::spawn(move || {
probe.event("some event");
});

assert_eq!(listen.recv(), "some event");
}

#[test]
fn chan_probe_with_payload() {
let payload = "test data".to_string();
let (probe, listen) = probe_with_payload(payload);

thread::spawn(move || {
// only event the expected result if the payload is what we expect
if probe.payload() == "test data" {
probe.event("data received");
} else {
probe.event("");
}

});

assert_eq!(listen.recv(), "data received");
}
}


/// Macros that provide easy use of Probes
#[macro_use]
pub mod macros {
/// Mimicks assert_eq!
/// Performs an assert_eq! on the first event sent by the probe.
#[macro_export]
macro_rules! p_assert_eq {
($listen:expr, $expected:expr) => {
#[cfg(feature = "tokio_executor")]
assert_eq!($listen.recv().await, $expected);
#[cfg(not(feature = "tokio_executor"))]
assert_eq!($listen.recv(), $expected);
};
}
Expand All @@ -168,7 +166,11 @@ pub mod macros {
let mut expected = $expected.clone(); // so we don't need the original mutable

loop {
match expected.iter().position(|x| x == &$listen.recv()) {
#[cfg(feature = "tokio_executor")]
let val = $listen.recv().await;
#[cfg(not(feature = "tokio_executor"))]
let val = $listen.recv();
match expected.iter().position(|x| x == &val) {
Some(pos) => {
expected.remove(pos);
if expected.len() == 0 {
Expand All @@ -193,21 +195,20 @@ pub mod macros {

#[cfg(test)]
mod tests {
use probe::{Probe, ProbeReceive};
use probe::channel::probe;
use crate::probe::{Probe, ProbeReceive, channel::probe};

#[test]
#[test_fn_macro::test]
fn p_assert_eq() {
let (probe, listen) = probe();
let (probe, mut listen) = probe();

probe.event("test".to_string());

p_assert_eq!(listen, "test".to_string());
}

#[test]
#[test_fn_macro::test]
fn p_assert_events() {
let (probe, listen) = probe();
let (probe, mut listen) = probe();

let expected = vec!["event_1", "event_2", "event_3"];
probe.event("event_1");
Expand All @@ -217,7 +218,7 @@ pub mod macros {
p_assert_events!(listen, expected);
}

#[test]
#[test_fn_macro::test]
fn p_timer() {
let (probe, listen) = probe();
probe.event("event_3");
Expand All @@ -227,3 +228,33 @@ pub mod macros {

}
}

#[cfg(test)]
mod tests {
use super::{Probe, ProbeReceive};
use super::channel::{probe, probe_with_payload};

#[test_fn_macro::test]
fn chan_probe() {
let (probe, mut listen) = probe();

probe.event("some event");

p_assert_eq!(listen, "some event");
}

#[test_fn_macro::test]
fn chan_probe_with_payload() {
let payload = "test data".to_string();
let (probe, mut listen) = probe_with_payload(payload);

// only event the expected result if the payload is what we expect
if probe.payload() == "test data" {
probe.event("data received");
} else {
probe.event("");
}

p_assert_eq!(listen, "data received");
}
}
17 changes: 17 additions & 0 deletions test_fn_macro/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "test_fn_macro"
version = "0.1.0"
authors = ["Linus Behrbohm <[email protected]>"]
edition = "2018"

[lib]
proc-macro = true

[features]
default = []
tokio_executor = []

[dependencies]
quote = "^1"
proc-macro2 = "^1"
syn = "^1"
24 changes: 24 additions & 0 deletions test_fn_macro/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use proc_macro::{
TokenStream,
};
use proc_macro2::{
TokenStream as TokenStream2,
};
use quote::quote;

#[proc_macro_attribute]
pub fn test(_args: TokenStream, input: TokenStream) -> TokenStream {
let input = TokenStream2::from(input);
#[cfg(feature = "tokio_executor")]
let attr = quote!{#[tokio::test]};
#[cfg(not(feature = "tokio_executor"))]
let attr = quote!{#[::core::prelude::v1::test]};
#[cfg(feature = "tokio_executor")]
let qual = quote!{async};
#[cfg(not(feature = "tokio_executor"))]
let qual = quote!{};
TokenStream::from(quote! {
#attr
#qual #input
})
}