Skip to content

Commit

Permalink
address review and fix clippy failure
Browse files Browse the repository at this point in the history
  • Loading branch information
b-naber committed Apr 8, 2022
1 parent 470811b commit 5e46bd6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 26 deletions.
17 changes: 6 additions & 11 deletions tokio-util/tests/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ async fn weak_sender() {

let tx_weak = tokio::spawn(async move {
for i in 0..10 {
if let Err(_) = tx.send(i).await {
if tx.send(i).await.is_err() {
return None;
}
}
Expand Down Expand Up @@ -339,19 +339,14 @@ async fn actor_weak_sender() {

async fn run(&mut self) {
let mut i = 0;
loop {
match self.receiver.recv().await {
Some(msg) => {
self.handle_message(msg);
}
None => {
break;
}
}
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg);

if i == 0 {
self.send_message_to_self().await;
}
i += 1;

i += 1
}

assert!(self.received_self_msg);
Expand Down
32 changes: 17 additions & 15 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Weak};
use crate::loom::sync::Arc;
use crate::park::thread::CachedParkThread;
use crate::park::Park;
use crate::sync::mpsc::error::TryRecvError;
Expand All @@ -11,9 +11,11 @@ use crate::sync::notify::Notify;
use std::fmt;
use std::mem;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, SeqCst};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::sync::Weak;
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
use std::usize;

/// Channel sender.
pub(crate) struct Tx<T, S> {
Expand Down Expand Up @@ -181,28 +183,31 @@ impl<T, S> TxWeak<T, S> {
// even though the channel might have been closed in the meantime.
// Need to check here whether the channel was actually closed.

let mut tx_count = inner.tx_count.load(Relaxed);
let mut tx_count = inner.tx_count.load(Acquire);

if tx_count == 0 {
// channel is closed
mem::drop(inner);
return None;
}

loop {
// FIXME Haven't thought the orderings on the CAS through yet
match inner
.tx_count
.compare_exchange(tx_count, tx_count + 1, SeqCst, SeqCst)
.compare_exchange(tx_count, tx_count + 1, AcqRel, Acquire)
{
Ok(prev_count) => {
if prev_count == 0 {
mem::drop(inner);
return None;
}
assert!(prev_count != 0);

return Some(Tx::new(inner));
}
Err(count) => {
if count == 0 {
Err(prev_count) => {
if prev_count == 0 {
mem::drop(inner);
return None;
}

tx_count = count;
tx_count = prev_count;
}
}
}
Expand Down Expand Up @@ -441,9 +446,6 @@ impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {

// ===== impl Semaphore for AtomicUsize =====

use std::sync::atomic::Ordering::Release;
use std::usize;

impl Semaphore for AtomicUsize {
fn add_permit(&self) {
let prev = self.fetch_sub(2, Release);
Expand Down

0 comments on commit 5e46bd6

Please sign in to comment.