Skip to content

Commit

Permalink
Fix Token...
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Mar 8, 2024
1 parent bf1ff3e commit 8727c37
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 26 deletions.
4 changes: 2 additions & 2 deletions crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,7 @@ pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Resul
unsafe {
match &s.flavor {
SenderFlavor::Array(chan) => chan.write(token, msg),
SenderFlavor::List(chan) => chan.write(token, msg),
SenderFlavor::List(chan) => chan.write(&mut token.list, msg),
SenderFlavor::Zero(chan) => chan.write(token, msg),
}
}
Expand All @@ -1525,7 +1525,7 @@ pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()
unsafe {
match &r.flavor {
ReceiverFlavor::Array(chan) => chan.read(token),
ReceiverFlavor::List(chan) => chan.read(token),
ReceiverFlavor::List(chan) => chan.read(&mut token.list),
ReceiverFlavor::Zero(chan) => chan.read(token),
ReceiverFlavor::At(chan) => {
mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
Expand Down
45 changes: 21 additions & 24 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl<T> Channel<T> {
}

/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) {
fn start_send(&self, token: &mut ListToken) {
let backoff = Backoff::new();
let mut next_block = ptr::null_mut::<Block<T>>();

Expand All @@ -215,7 +215,7 @@ impl<T> Channel<T> {

// Check if the channel is disconnected.
if tail & MARK_BIT != 0 {
token.list.block = ptr::null();
token.block = ptr::null();
break;
}

Expand Down Expand Up @@ -266,8 +266,8 @@ impl<T> Channel<T> {
(*block).next.store(n, Ordering::Release);
}

token.list.block = block as *const u8;
token.list.offset = offset;
token.block = block as *const u8;
token.offset = offset;
break;
},
Err(_) => {
Expand All @@ -282,15 +282,15 @@ impl<T> Channel<T> {
}

/// Writes a message into the channel.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
pub(crate) unsafe fn write(&self, token: &mut ListToken, msg: T) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.list.block.is_null() {
if token.block.is_null() {
return Err(msg);
}

// Write the message into the slot.
let block = token.list.block.cast::<Block<T>>();
let offset = token.list.offset;
let block = token.block.cast::<Block<T>>();
let offset = token.offset;
let slot = unsafe { (*block).get_slot_unchecked(offset) };
unsafe { slot.msg.get().write(msg) }
slot.state.fetch_or(WRITE, Ordering::Release);
Expand All @@ -301,7 +301,7 @@ impl<T> Channel<T> {
}

/// Attempts to reserve a slot for receiving a message.
fn start_recv(&self, token: &mut Token) -> bool {
fn start_recv(&self, token: &mut ListToken) -> bool {
let backoff = Backoff::new();
let mut head;
let mut block;
Expand Down Expand Up @@ -330,7 +330,7 @@ impl<T> Channel<T> {
// If the channel is disconnected...
if tail & MARK_BIT != 0 {
// ...then receive an error.
token.list.block = ptr::null();
token.block = ptr::null();
return true;
} else {
// Otherwise, the receive operation is not ready.
Expand Down Expand Up @@ -371,8 +371,8 @@ impl<T> Channel<T> {
self.head.index.store(next_index, Ordering::Release);
}

token.list.block = block as *const u8;
token.list.offset = offset;
token.block = block as *const u8;
token.offset = offset;
return true;
},
Err(_) => {
Expand All @@ -383,15 +383,15 @@ impl<T> Channel<T> {
}

/// Reads a message from the channel.
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
if token.list.block.is_null() {
pub(crate) unsafe fn read(&self, token: &mut ListToken) -> Result<T, ()> {
if token.block.is_null() {
// The channel is disconnected.
return Err(());
}

// Read the message.
let block = token.list.block as *mut Block<T>;
let offset = token.list.offset;
let block = token.block as *mut Block<T>;
let offset = token.offset;
let slot = unsafe { (*block).get_slot_unchecked(offset) };
slot.wait_write();
let msg = unsafe { slot.msg.get().read() };
Expand Down Expand Up @@ -423,8 +423,7 @@ impl<T> Channel<T> {
msg: T,
_deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = MaybeUninit::uninit();
let token = &mut unsafe { token.assume_init() };
let token = &mut ListToken::default();

self.start_send(token);
unsafe {
Expand All @@ -435,8 +434,7 @@ impl<T> Channel<T> {

/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = MaybeUninit::uninit();
let token = &mut unsafe { token.assume_init() };
let token = &mut ListToken::default();

if self.start_recv(token) {
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
Expand All @@ -447,8 +445,7 @@ impl<T> Channel<T> {

/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = MaybeUninit::uninit();
let token = &mut unsafe { token.assume_init() };
let token = &mut ListToken::default();

loop {
// Try receiving a message several times.
Expand Down Expand Up @@ -701,7 +698,7 @@ pub(crate) struct Sender<'a, T>(&'a Channel<T>);

impl<T> SelectHandle for Receiver<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
self.0.start_recv(token)
self.0.start_recv(&mut token.list)
}

fn deadline(&self) -> Option<Instant> {
Expand Down Expand Up @@ -737,7 +734,7 @@ impl<T> SelectHandle for Receiver<'_, T> {

impl<T> SelectHandle for Sender<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
self.0.start_send(token);
self.0.start_send(&mut token.list);
true
}

Expand Down

0 comments on commit 8727c37

Please sign in to comment.