Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add select_biased! macro #1040

Merged
merged 5 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions crossbeam-channel/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ enum Timeout {
fn run_select(
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
timeout: Timeout,
is_biased: bool,
) -> Option<(Token, usize, *const u8)> {
if handles.is_empty() {
// Wait until the timeout and return.
Expand All @@ -193,8 +194,10 @@ fn run_select(
}
}

// Shuffle the operations for fairness.
utils::shuffle(handles);
if !is_biased {
// Shuffle the operations for fairness.
utils::shuffle(handles);
}

// Create a token, which serves as a temporary variable that gets initialized in this function
// and is later used by a call to `channel::read()` or `channel::write()` that completes the
Expand Down Expand Up @@ -325,6 +328,7 @@ fn run_select(
fn run_ready(
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
timeout: Timeout,
is_biased: bool,
) -> Option<usize> {
if handles.is_empty() {
// Wait until the timeout and return.
Expand All @@ -341,8 +345,10 @@ fn run_ready(
}
}

// Shuffle the operations for fairness.
utils::shuffle(handles);
if !is_biased {
// Shuffle the operations for fairness.
utils::shuffle(handles);
}

loop {
let backoff = Backoff::new();
Expand Down Expand Up @@ -451,7 +457,7 @@ fn run_ready(
pub fn try_select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
) -> Result<SelectedOperation<'a>, TrySelectError> {
match run_select(handles, Timeout::Now) {
match run_select(handles, Timeout::Now, false) {
None => Err(TrySelectError),
Some((token, index, ptr)) => Ok(SelectedOperation {
token,
Expand All @@ -467,12 +473,13 @@ pub fn try_select<'a>(
#[inline]
pub fn select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
is_biased: bool,
) -> SelectedOperation<'a> {
if handles.is_empty() {
panic!("no operations have been added to `Select`");
}

let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap();
SelectedOperation {
token,
index,
Expand All @@ -487,10 +494,11 @@ pub fn select<'a>(
pub fn select_timeout<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
timeout: Duration,
is_biased: bool,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
match Instant::now().checked_add(timeout) {
Some(deadline) => select_deadline(handles, deadline),
None => Ok(select(handles)),
Some(deadline) => select_deadline(handles, deadline, is_biased),
None => Ok(select(handles, is_biased)),
}
}

Expand All @@ -499,8 +507,9 @@ pub fn select_timeout<'a>(
pub(crate) fn select_deadline<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
deadline: Instant,
is_biased: bool,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
match run_select(handles, Timeout::At(deadline)) {
match run_select(handles, Timeout::At(deadline), is_biased) {
None => Err(SelectTimeoutError),
Some((token, index, ptr)) => Ok(SelectedOperation {
token,
Expand Down Expand Up @@ -815,7 +824,7 @@ impl<'a> Select<'a> {
/// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371
/// ```
pub fn select(&mut self) -> SelectedOperation<'a> {
select(&mut self.handles)
select(&mut self.handles, false)
}

/// Blocks for a limited time until one of the operations becomes ready and selects it.
Expand Down Expand Up @@ -869,7 +878,7 @@ impl<'a> Select<'a> {
&mut self,
timeout: Duration,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
select_timeout(&mut self.handles, timeout)
select_timeout(&mut self.handles, timeout, false)
}

/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
Expand Down Expand Up @@ -925,7 +934,7 @@ impl<'a> Select<'a> {
&mut self,
deadline: Instant,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
select_deadline(&mut self.handles, deadline)
select_deadline(&mut self.handles, deadline, false)
}

/// Attempts to find a ready operation without blocking.
Expand Down Expand Up @@ -964,7 +973,7 @@ impl<'a> Select<'a> {
/// }
/// ```
pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
match run_ready(&mut self.handles, Timeout::Now) {
match run_ready(&mut self.handles, Timeout::Now, false) {
None => Err(TryReadyError),
Some(index) => Ok(index),
}
Expand Down Expand Up @@ -1021,7 +1030,7 @@ impl<'a> Select<'a> {
panic!("no operations have been added to `Select`");
}

run_ready(&mut self.handles, Timeout::Never).unwrap()
run_ready(&mut self.handles, Timeout::Never, false).unwrap()
}

/// Blocks for a limited time until one of the operations becomes ready.
Expand Down Expand Up @@ -1122,7 +1131,7 @@ impl<'a> Select<'a> {
/// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371
/// ```
pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
match run_ready(&mut self.handles, Timeout::At(deadline)) {
match run_ready(&mut self.handles, Timeout::At(deadline), false) {
None => Err(ReadyTimeoutError),
Some(index) => Ok(index),
}
Expand Down
38 changes: 32 additions & 6 deletions crossbeam-channel/src/select_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ macro_rules! crossbeam_channel_internal {
$cases:tt
) => {{
let _oper: $crate::SelectedOperation<'_> = {
let _oper = $crate::internal::select(&mut $sel);
let _oper = $crate::internal::select(&mut $sel, _IS_BIASED);

// Erase the lifetime so that `sel` can be dropped early even without NLL.
unsafe { ::std::mem::transmute(_oper) }
Expand Down Expand Up @@ -802,7 +802,7 @@ macro_rules! crossbeam_channel_internal {
$cases:tt
) => {{
let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = {
let _oper = $crate::internal::select_timeout(&mut $sel, $timeout);
let _oper = $crate::internal::select_timeout(&mut $sel, $timeout, false);

// Erase the lifetime so that `sel` can be dropped early even without NLL.
unsafe { ::std::mem::transmute(_oper) }
Expand Down Expand Up @@ -985,7 +985,8 @@ macro_rules! crossbeam_channel_internal {
///
/// This macro allows you to define a set of channel operations, wait until any one of them becomes
/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
/// among them is selected.
/// among them is selected (i.e. the unbiased selection). Use `select_biased!` for the biased
/// selection.
///
/// It is also possible to define a `default` case that gets executed if none of the operations are
/// ready, either right away or for a certain duration of time.
Expand Down Expand Up @@ -1121,8 +1122,33 @@ macro_rules! crossbeam_channel_internal {
#[macro_export]
macro_rules! select {
($($tokens:tt)*) => {
$crate::crossbeam_channel_internal!(
$($tokens)*
)
{
const _IS_BIASED: bool = false;

$crate::crossbeam_channel_internal!(
$($tokens)*
)
}
};
}

/// Selects from a set of channel operations.
///
/// This macro allows you to define a list of channel operations, wait until any one of them
/// becomes ready, and finally execute it. If multiple operations are ready at the same time, the
/// operation nearest to the front of the list is always selected (i.e. the biased selection). Use
/// [`select!`] for the unbiased selection.
///
/// Otherwise, this macro's functionality is identical to [`select!`]. Refer to it for the syntax.
#[macro_export]
macro_rules! select_biased {
($($tokens:tt)*) => {
{
const _IS_BIASED: bool = true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this was easiest way to implement... but this is accessible to user code. however, it seems we have precedent: _LEN.


$crate::crossbeam_channel_internal!(
$($tokens)*
)
}
};
}
2 changes: 2 additions & 0 deletions crossbeam-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ macro_rules! select {
(
$($name:pat = $rx:ident.$meth:ident() => $code:expr),+
) => ({
const _IS_BIASED: bool = false;

cc::crossbeam_channel_internal! {
$(
$meth(($rx).inner) -> res => {
Expand Down
35 changes: 34 additions & 1 deletion crossbeam-channel/tests/select_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::ops::Deref;
use std::thread;
use std::time::{Duration, Instant};

use crossbeam_channel::{after, bounded, never, select, tick, unbounded};
use crossbeam_channel::{after, bounded, never, select, select_biased, tick, unbounded};
use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError};
use crossbeam_utils::thread::scope;

Expand Down Expand Up @@ -943,6 +943,39 @@ fn fairness_send() {
assert!(hits.iter().all(|x| *x >= COUNT / 4));
}

#[test]
fn unfairness() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i omitted more test cases like send one. let me know if i need to add more...

#[cfg(miri)]
const COUNT: usize = 100;
#[cfg(not(miri))]
const COUNT: usize = 10_000;

let (s1, r1) = unbounded::<()>();
let (s2, r2) = unbounded::<()>();

for _ in 0..COUNT {
s1.send(()).unwrap();
s2.send(()).unwrap();
}

let mut hits = [0usize; 2];
for _ in 0..COUNT {
select_biased! {
recv(r1) -> _ => hits[0] += 1,
recv(r2) -> _ => hits[1] += 1,
}
}
assert_eq!(hits, [COUNT, 0]);

for _ in 0..COUNT {
select_biased! {
recv(r1) -> _ => hits[0] += 1,
recv(r2) -> _ => hits[1] += 1,
}
}
assert_eq!(hits, [COUNT, COUNT]);
}

#[allow(clippy::or_fun_call, clippy::unnecessary_literal_unwrap)] // This is intentional.
#[test]
fn references() {
Expand Down