Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add force_send method to channel Sender #1135

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::time::{Duration, Instant};
use crate::context::Context;
use crate::counter;
use crate::err::{
RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
ForceSendError, RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError,
TrySendError,
};
use crate::flavors;
use crate::select::{Operation, SelectHandle, Token};
Expand Down Expand Up @@ -410,6 +411,45 @@ impl<T> Sender<T> {
}
}

/// Sends a message to a channel without blocking. It will only fail if the channel is disconnected.
///
/// This method will either send a message into the channel immediately,
/// overwrite and return the last value in the channel or return an error if
/// the channel is full. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will send the message only if there
/// happens to be a receive operation on the other side of the channel at the same time.
///
/// ```
/// use crossbeam_channel::{bounded, ForceSendError};
///
/// let (s, r) = bounded(3);
/// assert_eq!(s.force_send(0), Ok(None));
/// assert_eq!(s.force_send(1), Ok(None));
/// assert_eq!(s.force_send(2), Ok(None));
/// assert_eq!(s.force_send(3), Ok(Some(2)));
/// assert_eq!(r.recv(), Ok(0));
/// assert_eq!(s.force_send(4), Ok(None));
/// assert_eq!(r.recv(), Ok(1));
/// assert_eq!(r.recv(), Ok(3));
/// assert_eq!(s.force_send(5), Ok(None));
/// assert_eq!(s.force_send(6), Ok(None));
/// assert_eq!(s.force_send(7), Ok(Some(6)));
/// assert_eq!(s.force_send(8), Ok(Some(7)));
/// assert_eq!(r.recv(), Ok(4));
/// assert_eq!(r.recv(), Ok(5));
/// assert_eq!(r.recv(), Ok(8));
/// drop(r);
/// assert_eq!(s.force_send(9), Err(ForceSendError::Disconnected(9)));
/// ``````
pub fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.force_send(msg),
SenderFlavor::List(chan) => chan.force_send(msg),
SenderFlavor::Zero(chan) => chan.force_send(msg),
}
}

/// Blocks the current thread until a message is sent or the channel is disconnected.
///
/// If the channel is full and not disconnected, this call will block until the send operation
Expand Down
64 changes: 64 additions & 0 deletions crossbeam-channel/src/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ use std::fmt;
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);

/// An error returned from the [`force_send`] method.
///
/// The error contains the message being sent so it can be recovered.
///
/// [`force_send`]: super::Sender::force_send
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum ForceSendError<T> {
/// The message could not be sent because the channel is disconnected.
Disconnected(T),
}

/// An error returned from the [`try_send`] method.
///
/// The error contains the message being sent so it can be recovered.
Expand Down Expand Up @@ -209,6 +220,59 @@ impl<T> TrySendError<T> {
}
}

impl<T> fmt::Debug for ForceSendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Self::Disconnected(..) => "Disconnected(..)".fmt(f),
}
}
}

impl<T> fmt::Display for ForceSendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Self::Disconnected(..) => "sending on a disconnected channel".fmt(f),
}
}
}

impl<T: Send> error::Error for ForceSendError<T> {}

impl<T> From<SendError<T>> for ForceSendError<T> {
fn from(err: SendError<T>) -> Self {
match err {
SendError(t) => Self::Disconnected(t),
}
}
}

impl<T> ForceSendError<T> {
/// Unwraps the message.
///
/// # Examples
///
/// ```
/// use crossbeam_channel::bounded;
///
/// let (s, r) = bounded(0);
/// drop(r);
///
/// if let Err(err) = s.force_send("foo") {
/// assert_eq!(err.into_inner(), "foo");
/// }
/// ```
pub fn into_inner(self) -> T {
match self {
Self::Disconnected(v) => v,
}
}

/// Returns `true` if the send operation failed because the channel is disconnected.
pub fn is_disconnected(&self) -> bool {
matches!(self, Self::Disconnected(_))
}
}

impl<T> fmt::Debug for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"SendTimeoutError(..)".fmt(f)
Expand Down
23 changes: 22 additions & 1 deletion crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Instant;
use crossbeam_utils::{Backoff, CachePadded};

use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::SyncWaker;

Expand Down Expand Up @@ -325,6 +325,27 @@ impl<T> Channel<T> {
}
}

/// Force send a message into the channel. Only fails if the channel is disconnected
pub(crate) fn force_send(&self, mut msg: T) -> Result<Option<T>, ForceSendError<T>> {
let mut token = Token::default();
if self.start_send(&mut token) {
match unsafe { self.write(&mut token, msg) } {
Ok(()) => Ok(None),
Err(msg) => Err(ForceSendError::Disconnected(msg)),
}
} else {
let tail = self.tail.load(Ordering::Acquire);
let prev_index = match tail & (self.mark_bit - 1) {
0 => self.cap() - 1,
x => x - 1,
};
let queued_msg =
unsafe { (*self.buffer.get_unchecked(prev_index).msg.get()).assume_init_mut() };
std::mem::swap(&mut msg, queued_msg);
Ok(Some(msg))
}
}

/// Sends a message into the channel.
pub(crate) fn send(
&self,
Expand Down
11 changes: 10 additions & 1 deletion crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::time::Instant;
use crossbeam_utils::{Backoff, CachePadded};

use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::SyncWaker;

Expand Down Expand Up @@ -417,6 +417,15 @@ impl<T> Channel<T> {
})
}

/// Forces a send, failing only if the channel is disconnected
pub(crate) fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
match self.send(msg, None) {
Ok(()) => Ok(None),
Err(SendTimeoutError::Disconnected(err)) => Err(ForceSendError::Disconnected(err)),
Err(SendTimeoutError::Timeout(_)) => unreachable!(),
}
}

/// Sends a message into the channel.
pub(crate) fn send(
&self,
Expand Down
22 changes: 21 additions & 1 deletion crossbeam-channel/src/flavors/zero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{fmt, ptr};
use crossbeam_utils::Backoff;

use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::Waker;

Expand Down Expand Up @@ -216,6 +216,26 @@ impl<T> Channel<T> {
}
}

/// Send the message if a receiver is connected, otherwise returns it to the sender
pub(crate) fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock().unwrap();

// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
}
Ok(None)
} else if inner.is_disconnected {
Err(ForceSendError::Disconnected(msg))
} else {
Ok(Some(msg))
}
}

/// Sends a message into the channel.
pub(crate) fn send(
&self,
Expand Down
4 changes: 2 additions & 2 deletions crossbeam-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ pub use crate::{
after, at, bounded, never, tick, unbounded, IntoIter, Iter, Receiver, Sender, TryIter,
},
err::{
ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError,
SendTimeoutError, TryReadyError, TryRecvError, TrySelectError, TrySendError,
ForceSendError, ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError,
SendError, SendTimeoutError, TryReadyError, TryRecvError, TrySelectError, TrySendError,
},
select::{Select, SelectedOperation},
};