Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom Debug implementations for mpsc #2667

Merged
merged 1 commit into from
Nov 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,11 @@ mod queue;
#[cfg(feature = "sink")]
mod sink_impl;

#[derive(Debug)]
struct UnboundedSenderInner<T> {
// Channel state shared between the sender and receiver.
inner: Arc<UnboundedInner<T>>,
}

#[derive(Debug)]
struct BoundedSenderInner<T> {
// Channel state shared between the sender and receiver.
inner: Arc<BoundedInner<T>>,
Expand All @@ -122,13 +120,11 @@ impl<T> Unpin for BoundedSenderInner<T> {}
/// The transmission end of a bounded mpsc channel.
///
/// This value is created by the [`channel`](channel) function.
#[derive(Debug)]
pub struct Sender<T>(Option<BoundedSenderInner<T>>);

/// The transmission end of an unbounded mpsc channel.
///
/// This value is created by the [`unbounded`](unbounded) function.
#[derive(Debug)]
pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);

trait AssertKinds: Send + Sync + Clone {}
Expand All @@ -137,15 +133,13 @@ impl AssertKinds for UnboundedSender<u32> {}
/// The receiving end of a bounded mpsc channel.
///
/// This value is created by the [`channel`](channel) function.
#[derive(Debug)]
pub struct Receiver<T> {
inner: Option<Arc<BoundedInner<T>>>,
}

/// The receiving end of an unbounded mpsc channel.
///
/// This value is created by the [`unbounded`](unbounded) function.
#[derive(Debug)]
pub struct UnboundedReceiver<T> {
inner: Option<Arc<UnboundedInner<T>>>,
}
Expand Down Expand Up @@ -255,7 +249,6 @@ impl fmt::Display for TryRecvError {

impl std::error::Error for TryRecvError {}

#[derive(Debug)]
struct UnboundedInner<T> {
// Internal channel state. Consists of the number of messages stored in the
// channel as well as a flag signalling that the channel is closed.
Expand All @@ -271,7 +264,6 @@ struct UnboundedInner<T> {
recv_task: AtomicWaker,
}

#[derive(Debug)]
struct BoundedInner<T> {
// Max buffer size of the channel.
buffer: usize,
Expand All @@ -294,7 +286,7 @@ struct BoundedInner<T> {
}

// Struct representation of `Inner::state`.
#[derive(Debug, Clone, Copy)]
#[derive(Clone, Copy)]
struct State {
// `true` when the channel is open
is_open: bool,
Expand All @@ -318,7 +310,6 @@ const MAX_CAPACITY: usize = !(OPEN_MASK);
const MAX_BUFFER: usize = MAX_CAPACITY >> 1;

// Sent to the consumer to wake up blocked producers
#[derive(Debug)]
struct SenderTask {
task: Option<Waker>,
is_parked: bool,
Expand Down Expand Up @@ -941,6 +932,18 @@ impl<T> Drop for BoundedSenderInner<T> {
}
}

impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
}
}

impl<T> fmt::Debug for UnboundedSender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
}
}

/*
*
* ===== impl Receiver =====
Expand Down Expand Up @@ -1101,6 +1104,18 @@ impl<T> Drop for Receiver<T> {
}
}

impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let closed = if let Some(ref inner) = self.inner {
decode_state(inner.state.load(SeqCst)).is_closed()
} else {
false
};

f.debug_struct("Receiver").field("closed", &closed).finish()
}
}

impl<T> UnboundedReceiver<T> {
/// Closes the receiving half of a channel, without dropping it.
///
Expand Down Expand Up @@ -1233,6 +1248,18 @@ impl<T> Drop for UnboundedReceiver<T> {
}
}

impl<T> fmt::Debug for UnboundedReceiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let closed = if let Some(ref inner) = self.inner {
decode_state(inner.state.load(SeqCst)).is_closed()
} else {
false
};

f.debug_struct("Receiver").field("closed", &closed).finish()
}
}

/*
*
* ===== impl Inner =====
Expand Down
2 changes: 0 additions & 2 deletions futures-channel/src/mpsc/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ pub(super) enum PopResult<T> {
Inconsistent,
}

#[derive(Debug)]
struct Node<T> {
next: AtomicPtr<Self>,
value: Option<T>,
Expand All @@ -70,7 +69,6 @@ struct Node<T> {
/// The multi-producer single-consumer structure. This is not cloneable, but it
/// may be safely shared so long as it is guaranteed that there is only one
/// popper at a time (many pushers are allowed).
#[derive(Debug)]
pub(super) struct Queue<T> {
head: AtomicPtr<Node<T>>,
tail: UnsafeCell<*mut Node<T>>,
Expand Down