-
Notifications
You must be signed in to change notification settings - Fork 471
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce disconnection state for channels #1114
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -656,14 +656,52 @@ impl<T> Sender<T> { | |
_ => false, | ||
} | ||
} | ||
|
||
/// Disconnects the channel. | ||
/// | ||
/// Explicitly disconnects the channel and returns `true`, as if all instances of either side | ||
/// (sender or receiver) are dropped, unless it has been already disconnected. Otherwise, this | ||
/// method does nothing and returns `false`. | ||
/// | ||
/// The successful disconnect operation results in immediately rejecting sending subsequent | ||
/// messages to receivers. | ||
/// | ||
/// Disconnected channels can be restored connected by calling [`connect`], as long as there | ||
/// are at least a sender and a receiver for the channel. | ||
/// | ||
/// [`connect`]: Self::connect | ||
pub fn disconnect(&self) -> bool { | ||
match &self.flavor { | ||
SenderFlavor::List(chan) => chan.disconnect(), | ||
_ => todo!(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. obviously, i need to add support other flavors. this is true for other 3 I'll do this once i get some confirmation of the intent of merging this pr without much change. |
||
} | ||
} | ||
|
||
/// Connects the channel. | ||
/// | ||
/// Connects the disconnected channel and returns `true`, unless it has been already | ||
/// connected. Otherwise, this method does nothing and returns `false`. | ||
/// | ||
/// The successful connect operation results in immediately succeeding sending subsequent | ||
/// messages to receivers. | ||
/// | ||
/// Connected channels can be disconnected again by calling [`disconnect`]. | ||
/// | ||
/// [`disconnect`]: Self::disconnect | ||
pub fn connect(&self) -> bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note to self: rename to |
||
match &self.flavor { | ||
SenderFlavor::List(chan) => chan.connect(), | ||
_ => todo!(), | ||
} | ||
} | ||
} | ||
|
||
impl<T> Drop for Sender<T> { | ||
fn drop(&mut self) { | ||
unsafe { | ||
match &self.flavor { | ||
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), | ||
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), | ||
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders_to_drop()), | ||
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), | ||
} | ||
} | ||
|
@@ -1153,14 +1191,57 @@ impl<T> Receiver<T> { | |
_ => false, | ||
} | ||
} | ||
|
||
/// Disconnects the channel. | ||
/// | ||
/// Explicitly disconnects the channel and returns `true`, as if all instances of either side | ||
/// (receiver and sender) are dropped, unless it has been already disconnected. Otherwise, this | ||
/// method does nothing and returns `false`. Also, this method does nothing and returns | ||
/// `false` as well, if this method is called on channels other than bounded-capacity, | ||
/// unbounded-capacity or zero-capacity channels. | ||
Comment on lines
+1200
to
+1201
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not coded yet. but i hope this isn't controversial behavior... ;) |
||
/// | ||
/// The successful disconnect operation results in immediately rejecting receiving messages, | ||
/// unless they are sent already but not received yet. | ||
/// | ||
/// Disconnected channels can be restored connected by calling [`connect`], as long as there | ||
/// are at least a sender and a receiver for the channel. | ||
/// | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe mention about race condition among other disconnect()s and drop of the last instance of sender/receiver... |
||
/// [`connect`]: Self::connect | ||
pub fn disconnect(&self) -> bool { | ||
match &self.flavor { | ||
ReceiverFlavor::List(chan) => chan.disconnect(), | ||
_ => todo!(), | ||
} | ||
} | ||
|
||
/// Connects the channel. | ||
/// | ||
/// Connects the disconnected channel and returns `true`, unless it has been already | ||
/// connected. Otherwise, this method does nothing and returns `false`. Also, this method does | ||
/// nothing and returns `false` as well, if this method is called on channels other than | ||
/// bounded-capacity, unbounded-capacity or zero-capacity channels. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oops, remove this empty line later.... |
||
/// | ||
/// The successful connect operation results in immediately succeeding receiving messages | ||
/// from senders. | ||
/// | ||
/// Connected channels can be disconnected again by calling [`disconnect`]. | ||
/// | ||
/// [`disconnect`]: Self::disconnect | ||
pub fn connect(&self) -> bool { | ||
match &self.flavor { | ||
ReceiverFlavor::List(chan) => chan.connect(), | ||
_ => todo!(), | ||
} | ||
} | ||
} | ||
|
||
impl<T> Drop for Receiver<T> { | ||
fn drop(&mut self) { | ||
unsafe { | ||
match &self.flavor { | ||
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), | ||
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), | ||
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers_to_drop()), | ||
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), | ||
ReceiverFlavor::At(_) => {} | ||
ReceiverFlavor::Tick(_) => {} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,11 +34,16 @@ const LAP: usize = 32; | |
// The maximum number of messages a block can hold. | ||
const BLOCK_CAP: usize = LAP - 1; | ||
// How many lower bits are reserved for metadata. | ||
const SHIFT: usize = 1; | ||
// Has two different purposes: | ||
// * If set in head, indicates that the block is not the last one. | ||
// * If set in tail, indicates that the channel is disconnected. | ||
const MARK_BIT: usize = 1; | ||
Comment on lines
-37
to
-41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. btw, let me know if this flag dedup should be done as a preparatory pr. |
||
const SHIFT: usize = 2; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (note to self): HEAD_SHIFT = 1 and TAIL_SHIFT = 2? |
||
|
||
const HEAD_NOT_LAST: usize = 1; | ||
|
||
// Channel is disconnected implicitly via drop | ||
const TAIL_DISCONNECT_IMPLICIT: usize = 1; | ||
// Channel is disconnected explicitly as requested | ||
const TAIL_DISCONNECT_EXPLICIT: usize = 2; | ||
|
||
const TAIL_DISCONNECT_ANY: usize = TAIL_DISCONNECT_IMPLICIT | TAIL_DISCONNECT_EXPLICIT; | ||
|
||
/// A slot in a block. | ||
struct Slot<T> { | ||
|
@@ -204,7 +209,7 @@ impl<T> Channel<T> { | |
|
||
loop { | ||
// Check if the channel is disconnected. | ||
if tail & MARK_BIT != 0 { | ||
if tail & TAIL_DISCONNECT_ANY != 0 { | ||
token.list.block = ptr::null(); | ||
return true; | ||
} | ||
|
@@ -317,14 +322,14 @@ impl<T> Channel<T> { | |
|
||
let mut new_head = head + (1 << SHIFT); | ||
|
||
if new_head & MARK_BIT == 0 { | ||
if new_head & HEAD_NOT_LAST == 0 { | ||
atomic::fence(Ordering::SeqCst); | ||
let tail = self.tail.index.load(Ordering::Relaxed); | ||
|
||
// If the tail equals the head, that means the channel is empty. | ||
if head >> SHIFT == tail >> SHIFT { | ||
// If the channel is disconnected... | ||
if tail & MARK_BIT != 0 { | ||
if tail & TAIL_DISCONNECT_ANY != 0 { | ||
// ...then receive an error. | ||
token.list.block = ptr::null(); | ||
return true; | ||
|
@@ -334,9 +339,9 @@ impl<T> Channel<T> { | |
} | ||
} | ||
|
||
// If head and tail are not in the same block, set `MARK_BIT` in head. | ||
// If head and tail are not in the same block, set `HEAD_NOT_LAST` in head. | ||
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { | ||
new_head |= MARK_BIT; | ||
new_head |= HEAD_NOT_LAST; | ||
} | ||
} | ||
|
||
|
@@ -360,9 +365,9 @@ impl<T> Channel<T> { | |
// If we've reached the end of the block, move to the next one. | ||
if offset + 1 == BLOCK_CAP { | ||
let next = (*block).wait_next(); | ||
let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); | ||
let mut next_index = (new_head & !HEAD_NOT_LAST).wrapping_add(1 << SHIFT); | ||
if !(*next).next.load(Ordering::Relaxed).is_null() { | ||
next_index |= MARK_BIT; | ||
next_index |= HEAD_NOT_LAST; | ||
} | ||
|
||
self.head.block.store(next, Ordering::Release); | ||
|
@@ -538,24 +543,53 @@ impl<T> Channel<T> { | |
/// Disconnects senders and wakes up all blocked receivers. | ||
/// | ||
/// Returns `true` if this call disconnected the channel. | ||
pub(crate) fn disconnect_senders(&self) -> bool { | ||
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); | ||
pub(crate) fn disconnect_senders_to_drop(&self) -> bool { | ||
let tail = self | ||
.tail | ||
.index | ||
.fetch_or(TAIL_DISCONNECT_IMPLICIT, Ordering::SeqCst); | ||
|
||
if tail & TAIL_DISCONNECT_IMPLICIT == 0 { | ||
self.receivers.disconnect(); | ||
true | ||
} else { | ||
false | ||
} | ||
} | ||
|
||
pub(crate) fn disconnect(&self) -> bool { | ||
let tail = self | ||
.tail | ||
.index | ||
.fetch_or(TAIL_DISCONNECT_EXPLICIT, Ordering::SeqCst); | ||
|
||
if tail & MARK_BIT == 0 { | ||
if tail & TAIL_DISCONNECT_ANY == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the use of TAIL_DISCONNECT_ANY, instead of TAIL_DISCONNECT_EXPLICIT is intentional and needs a comment... |
||
self.receivers.disconnect(); | ||
true | ||
} else { | ||
false | ||
} | ||
} | ||
|
||
pub(crate) fn connect(&self) -> bool { | ||
let tail = self | ||
.tail | ||
.index | ||
.fetch_and(!TAIL_DISCONNECT_EXPLICIT, Ordering::SeqCst); | ||
|
||
tail & TAIL_DISCONNECT_ANY == TAIL_DISCONNECT_EXPLICIT | ||
} | ||
|
||
/// Disconnects receivers. | ||
/// | ||
/// Returns `true` if this call disconnected the channel. | ||
pub(crate) fn disconnect_receivers(&self) -> bool { | ||
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); | ||
pub(crate) fn disconnect_receivers_to_drop(&self) -> bool { | ||
let tail = self | ||
.tail | ||
.index | ||
.fetch_or(TAIL_DISCONNECT_IMPLICIT, Ordering::SeqCst); | ||
|
||
if tail & MARK_BIT == 0 { | ||
if tail & TAIL_DISCONNECT_IMPLICIT == 0 { | ||
// If receivers are dropped first, discard all messages to free | ||
// memory eagerly. | ||
self.discard_all_messages(); | ||
|
@@ -628,13 +662,13 @@ impl<T> Channel<T> { | |
drop(Box::from_raw(block)); | ||
} | ||
} | ||
head &= !MARK_BIT; | ||
head &= !HEAD_NOT_LAST; | ||
self.head.index.store(head, Ordering::Release); | ||
} | ||
|
||
/// Returns `true` if the channel is disconnected. | ||
pub(crate) fn is_disconnected(&self) -> bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess it's a prime time to expose this to public api? (i know there were controversial in the past..) |
||
self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 | ||
self.tail.index.load(Ordering::SeqCst) & TAIL_DISCONNECT_ANY != 0 | ||
} | ||
|
||
/// Returns `true` if the channel is empty. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -580,3 +580,84 @@ fn channel_through_channel() { | |
}) | ||
.unwrap(); | ||
} | ||
|
||
#[test] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe better off writing a test for possible race condition, where a sending thread disconnect/reconnect/send(T) successively , and the receiver thread may/may not see disconnected, or actual message. |
||
fn disconnect_by_sender() { | ||
let (s, r) = unbounded::<()>(); | ||
let s2 = s.clone(); | ||
|
||
assert!(s.disconnect()); | ||
|
||
assert!(!s.disconnect()); | ||
assert!(!s2.disconnect()); | ||
|
||
drop(r); | ||
} | ||
|
||
#[test] | ||
fn connect_by_sender() { | ||
let (s, r) = unbounded::<()>(); | ||
assert!(s.disconnect()); | ||
|
||
assert!(s.connect()); | ||
assert!(s.disconnect()); | ||
|
||
drop(r); | ||
// connect should fail after all receivers has gone | ||
assert!(!s.connect()); | ||
Comment on lines
+606
to
+607
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also disconnect should fail after all receivers has gone even if the channel isn't disconnected yet. |
||
} | ||
|
||
#[test] | ||
fn disconnect_by_receiver() { | ||
let (s, r) = unbounded::<()>(); | ||
let r2 = r.clone(); | ||
assert!(r.disconnect()); | ||
assert!(!r.disconnect()); | ||
assert!(!r2.disconnect()); | ||
drop(s); | ||
} | ||
|
||
#[test] | ||
fn connect_by_receiver() { | ||
let (s, r) = unbounded::<()>(); | ||
assert!(r.disconnect()); | ||
|
||
assert!(r.connect()); | ||
assert!(r.disconnect()); | ||
|
||
drop(s); | ||
// connect should fail after all senders has gone | ||
assert!(!r.connect()); | ||
} | ||
|
||
#[test] | ||
fn send_after_disconnect_then_connect() { | ||
let (s, r) = unbounded::<()>(); | ||
|
||
assert!(s.disconnect()); | ||
assert_eq!(s.send(()), Err(SendError(()))); | ||
|
||
assert!(s.connect()); | ||
assert_eq!(s.send(()), Ok(())); | ||
|
||
drop(r); | ||
} | ||
|
||
#[test] | ||
fn receive_after_disconnect_then_connect() { | ||
let (s, r) = unbounded::<()>(); | ||
s.send(()).unwrap(); | ||
|
||
assert!(r.disconnect()); | ||
assert_eq!(r.recv(), Ok(())); | ||
assert_eq!( | ||
r.recv_timeout(Duration::from_millis(1)), | ||
Err(RecvTimeoutError::Disconnected) | ||
); | ||
|
||
assert!(r.connect()); | ||
assert_eq!( | ||
r.recv_timeout(Duration::from_millis(1)), | ||
Err(RecvTimeoutError::Timeout) | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess i need to add some motivating example here for the justification of disconnect()...