Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce disconnection state for channels #1114

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 83 additions & 2 deletions crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,14 +656,52 @@ impl<T> Sender<T> {
_ => false,
}
}

/// Disconnects the channel.
///
/// Explicitly disconnects the channel and returns `true`, as if all instances of either side
/// (sender or receiver) are dropped, unless it has been already disconnected. Otherwise, this
/// method does nothing and returns `false`.
///
/// The successful disconnect operation results in immediately rejecting sending subsequent
/// messages to receivers.
///
/// Disconnected channels can be restored connected by calling [`connect`], as long as there
/// are at least a sender and a receiver for the channel.
///
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess i need to add some motivating example here for the justification of disconnect()...

/// [`connect`]: Self::connect
pub fn disconnect(&self) -> bool {
match &self.flavor {
SenderFlavor::List(chan) => chan.disconnect(),
_ => todo!(),
Copy link
Contributor Author

@ryoqun ryoqun May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

obviously, i need to add support other flavors. this is true for other 3 todo!()s.

I'll do this once i get some confirmation of the intent of merging this pr without much change.

}
}

/// Connects the channel.
///
/// Connects the disconnected channel and returns `true`, unless it has been already
/// connected. Otherwise, this method does nothing and returns `false`.
///
/// The successful connect operation results in immediately succeeding sending subsequent
/// messages to receivers.
///
/// Connected channels can be disconnected again by calling [`disconnect`].
///
/// [`disconnect`]: Self::disconnect
pub fn connect(&self) -> bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: rename to reconnect()?

match &self.flavor {
SenderFlavor::List(chan) => chan.connect(),
_ => todo!(),
}
}
}

impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders_to_drop()),
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}
}
Expand Down Expand Up @@ -1153,14 +1191,57 @@ impl<T> Receiver<T> {
_ => false,
}
}

/// Disconnects the channel.
///
/// Explicitly disconnects the channel and returns `true`, as if all instances of either side
/// (receiver and sender) are dropped, unless it has been already disconnected. Otherwise, this
/// method does nothing and returns `false`. Also, this method does nothing and returns
/// `false` as well, if this method is called on channels other than bounded-capacity,
/// unbounded-capacity or zero-capacity channels.
Comment on lines +1200 to +1201
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not coded yet. but i hope this isn't controversial behavior... ;)

///
/// The successful disconnect operation results in immediately rejecting receiving messages,
/// unless they are sent already but not received yet.
///
/// Disconnected channels can be restored connected by calling [`connect`], as long as there
/// are at least a sender and a receiver for the channel.
///
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe mention about race condition among other disconnect()s and drop of the last instance of sender/receiver...

/// [`connect`]: Self::connect
pub fn disconnect(&self) -> bool {
match &self.flavor {
ReceiverFlavor::List(chan) => chan.disconnect(),
_ => todo!(),
}
}

/// Connects the channel.
///
/// Connects the disconnected channel and returns `true`, unless it has been already
/// connected. Otherwise, this method does nothing and returns `false`. Also, this method does
/// nothing and returns `false` as well, if this method is called on channels other than
/// bounded-capacity, unbounded-capacity or zero-capacity channels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, remove this empty line later....

///
/// The successful connect operation results in immediately succeeding receiving messages
/// from senders.
///
/// Connected channels can be disconnected again by calling [`disconnect`].
///
/// [`disconnect`]: Self::disconnect
pub fn connect(&self) -> bool {
match &self.flavor {
ReceiverFlavor::List(chan) => chan.connect(),
_ => todo!(),
}
}
}

impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers_to_drop()),
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::At(_) => {}
ReceiverFlavor::Tick(_) => {}
Expand Down
74 changes: 54 additions & 20 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ const LAP: usize = 32;
// The maximum number of messages a block can hold.
const BLOCK_CAP: usize = LAP - 1;
// How many lower bits are reserved for metadata.
const SHIFT: usize = 1;
// Has two different purposes:
// * If set in head, indicates that the block is not the last one.
// * If set in tail, indicates that the channel is disconnected.
const MARK_BIT: usize = 1;
Comment on lines -37 to -41
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, let me know if this flag dedup should be done as a preparatory pr.

const SHIFT: usize = 2;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(note to self): HEAD_SHIFT = 1 and TAIL_SHIFT = 2?


const HEAD_NOT_LAST: usize = 1;

// Channel is disconnected implicitly via drop
const TAIL_DISCONNECT_IMPLICIT: usize = 1;
// Channel is disconnected explicitly as requested
const TAIL_DISCONNECT_EXPLICIT: usize = 2;

const TAIL_DISCONNECT_ANY: usize = TAIL_DISCONNECT_IMPLICIT | TAIL_DISCONNECT_EXPLICIT;

/// A slot in a block.
struct Slot<T> {
Expand Down Expand Up @@ -204,7 +209,7 @@ impl<T> Channel<T> {

loop {
// Check if the channel is disconnected.
if tail & MARK_BIT != 0 {
if tail & TAIL_DISCONNECT_ANY != 0 {
token.list.block = ptr::null();
return true;
}
Expand Down Expand Up @@ -317,14 +322,14 @@ impl<T> Channel<T> {

let mut new_head = head + (1 << SHIFT);

if new_head & MARK_BIT == 0 {
if new_head & HEAD_NOT_LAST == 0 {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::Relaxed);

// If the tail equals the head, that means the channel is empty.
if head >> SHIFT == tail >> SHIFT {
// If the channel is disconnected...
if tail & MARK_BIT != 0 {
if tail & TAIL_DISCONNECT_ANY != 0 {
// ...then receive an error.
token.list.block = ptr::null();
return true;
Expand All @@ -334,9 +339,9 @@ impl<T> Channel<T> {
}
}

// If head and tail are not in the same block, set `MARK_BIT` in head.
// If head and tail are not in the same block, set `HEAD_NOT_LAST` in head.
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
new_head |= MARK_BIT;
new_head |= HEAD_NOT_LAST;
}
}

Expand All @@ -360,9 +365,9 @@ impl<T> Channel<T> {
// If we've reached the end of the block, move to the next one.
if offset + 1 == BLOCK_CAP {
let next = (*block).wait_next();
let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
let mut next_index = (new_head & !HEAD_NOT_LAST).wrapping_add(1 << SHIFT);
if !(*next).next.load(Ordering::Relaxed).is_null() {
next_index |= MARK_BIT;
next_index |= HEAD_NOT_LAST;
}

self.head.block.store(next, Ordering::Release);
Expand Down Expand Up @@ -538,24 +543,53 @@ impl<T> Channel<T> {
/// Disconnects senders and wakes up all blocked receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect_senders(&self) -> bool {
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
pub(crate) fn disconnect_senders_to_drop(&self) -> bool {
let tail = self
.tail
.index
.fetch_or(TAIL_DISCONNECT_IMPLICIT, Ordering::SeqCst);

if tail & TAIL_DISCONNECT_IMPLICIT == 0 {
self.receivers.disconnect();
true
} else {
false
}
}

pub(crate) fn disconnect(&self) -> bool {
let tail = self
.tail
.index
.fetch_or(TAIL_DISCONNECT_EXPLICIT, Ordering::SeqCst);

if tail & MARK_BIT == 0 {
if tail & TAIL_DISCONNECT_ANY == 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the use of TAIL_DISCONNECT_ANY, instead of TAIL_DISCONNECT_EXPLICIT is intentional and needs a comment...

self.receivers.disconnect();
true
} else {
false
}
}

pub(crate) fn connect(&self) -> bool {
let tail = self
.tail
.index
.fetch_and(!TAIL_DISCONNECT_EXPLICIT, Ordering::SeqCst);

tail & TAIL_DISCONNECT_ANY == TAIL_DISCONNECT_EXPLICIT
}

/// Disconnects receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect_receivers(&self) -> bool {
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
pub(crate) fn disconnect_receivers_to_drop(&self) -> bool {
let tail = self
.tail
.index
.fetch_or(TAIL_DISCONNECT_IMPLICIT, Ordering::SeqCst);

if tail & MARK_BIT == 0 {
if tail & TAIL_DISCONNECT_IMPLICIT == 0 {
// If receivers are dropped first, discard all messages to free
// memory eagerly.
self.discard_all_messages();
Expand Down Expand Up @@ -628,13 +662,13 @@ impl<T> Channel<T> {
drop(Box::from_raw(block));
}
}
head &= !MARK_BIT;
head &= !HEAD_NOT_LAST;
self.head.index.store(head, Ordering::Release);
}

/// Returns `true` if the channel is disconnected.
pub(crate) fn is_disconnected(&self) -> bool {
Copy link
Contributor Author

@ryoqun ryoqun May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's a prime time to expose this to public api? (i know there were controversial in the past..)

self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
self.tail.index.load(Ordering::SeqCst) & TAIL_DISCONNECT_ANY != 0
}

/// Returns `true` if the channel is empty.
Expand Down
81 changes: 81 additions & 0 deletions crossbeam-channel/tests/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,3 +580,84 @@ fn channel_through_channel() {
})
.unwrap();
}

#[test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better off writing a test for possible race condition, where a sending thread disconnect/reconnect/send(T) successively , and the receiver thread may/may not see disconnected, or actual message.

fn disconnect_by_sender() {
let (s, r) = unbounded::<()>();
let s2 = s.clone();

assert!(s.disconnect());

assert!(!s.disconnect());
assert!(!s2.disconnect());

drop(r);
}

#[test]
fn connect_by_sender() {
let (s, r) = unbounded::<()>();
assert!(s.disconnect());

assert!(s.connect());
assert!(s.disconnect());

drop(r);
// connect should fail after all receivers has gone
assert!(!s.connect());
Comment on lines +606 to +607
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also disconnect should fail after all receivers has gone even if the channel isn't disconnected yet.

}

#[test]
fn disconnect_by_receiver() {
let (s, r) = unbounded::<()>();
let r2 = r.clone();
assert!(r.disconnect());
assert!(!r.disconnect());
assert!(!r2.disconnect());
drop(s);
}

#[test]
fn connect_by_receiver() {
let (s, r) = unbounded::<()>();
assert!(r.disconnect());

assert!(r.connect());
assert!(r.disconnect());

drop(s);
// connect should fail after all senders has gone
assert!(!r.connect());
}

#[test]
fn send_after_disconnect_then_connect() {
let (s, r) = unbounded::<()>();

assert!(s.disconnect());
assert_eq!(s.send(()), Err(SendError(())));

assert!(s.connect());
assert_eq!(s.send(()), Ok(()));

drop(r);
}

#[test]
fn receive_after_disconnect_then_connect() {
let (s, r) = unbounded::<()>();
s.send(()).unwrap();

assert!(r.disconnect());
assert_eq!(r.recv(), Ok(()));
assert_eq!(
r.recv_timeout(Duration::from_millis(1)),
Err(RecvTimeoutError::Disconnected)
);

assert!(r.connect());
assert_eq!(
r.recv_timeout(Duration::from_millis(1)),
Err(RecvTimeoutError::Timeout)
);
}