Skip to content

Commit

Permalink
Custom Debug implementations for mpsc (#2667)
Browse files Browse the repository at this point in the history
  • Loading branch information
daxpedda authored Nov 27, 2022
1 parent c938001 commit be27d72
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 12 deletions.
47 changes: 37 additions & 10 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,11 @@ mod queue;
#[cfg(feature = "sink")]
mod sink_impl;

#[derive(Debug)]
struct UnboundedSenderInner<T> {
// Channel state shared between the sender and receiver.
inner: Arc<UnboundedInner<T>>,
}

#[derive(Debug)]
struct BoundedSenderInner<T> {
// Channel state shared between the sender and receiver.
inner: Arc<BoundedInner<T>>,
Expand All @@ -122,13 +120,11 @@ impl<T> Unpin for BoundedSenderInner<T> {}
/// The transmission end of a bounded mpsc channel.
///
/// This value is created by the [`channel`](channel) function.
#[derive(Debug)]
pub struct Sender<T>(Option<BoundedSenderInner<T>>);

/// The transmission end of an unbounded mpsc channel.
///
/// This value is created by the [`unbounded`](unbounded) function.
#[derive(Debug)]
pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);

trait AssertKinds: Send + Sync + Clone {}
Expand All @@ -137,15 +133,13 @@ impl AssertKinds for UnboundedSender<u32> {}
/// The receiving end of a bounded mpsc channel.
///
/// This value is created by the [`channel`](channel) function.
#[derive(Debug)]
pub struct Receiver<T> {
inner: Option<Arc<BoundedInner<T>>>,
}

/// The receiving end of an unbounded mpsc channel.
///
/// This value is created by the [`unbounded`](unbounded) function.
#[derive(Debug)]
pub struct UnboundedReceiver<T> {
inner: Option<Arc<UnboundedInner<T>>>,
}
Expand Down Expand Up @@ -255,7 +249,6 @@ impl fmt::Display for TryRecvError {

impl std::error::Error for TryRecvError {}

#[derive(Debug)]
struct UnboundedInner<T> {
// Internal channel state. Consists of the number of messages stored in the
// channel as well as a flag signalling that the channel is closed.
Expand All @@ -271,7 +264,6 @@ struct UnboundedInner<T> {
recv_task: AtomicWaker,
}

#[derive(Debug)]
struct BoundedInner<T> {
// Max buffer size of the channel.
buffer: usize,
Expand All @@ -294,7 +286,7 @@ struct BoundedInner<T> {
}

// Struct representation of `Inner::state`.
#[derive(Debug, Clone, Copy)]
#[derive(Clone, Copy)]
struct State {
// `true` when the channel is open
is_open: bool,
Expand All @@ -318,7 +310,6 @@ const MAX_CAPACITY: usize = !(OPEN_MASK);
const MAX_BUFFER: usize = MAX_CAPACITY >> 1;

// Sent to the consumer to wake up blocked producers
#[derive(Debug)]
struct SenderTask {
task: Option<Waker>,
is_parked: bool,
Expand Down Expand Up @@ -941,6 +932,18 @@ impl<T> Drop for BoundedSenderInner<T> {
}
}

impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
}
}

impl<T> fmt::Debug for UnboundedSender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
}
}

/*
*
* ===== impl Receiver =====
Expand Down Expand Up @@ -1101,6 +1104,18 @@ impl<T> Drop for Receiver<T> {
}
}

impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let closed = if let Some(ref inner) = self.inner {
decode_state(inner.state.load(SeqCst)).is_closed()
} else {
false
};

f.debug_struct("Receiver").field("closed", &closed).finish()
}
}

impl<T> UnboundedReceiver<T> {
/// Closes the receiving half of a channel, without dropping it.
///
Expand Down Expand Up @@ -1233,6 +1248,18 @@ impl<T> Drop for UnboundedReceiver<T> {
}
}

impl<T> fmt::Debug for UnboundedReceiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let closed = if let Some(ref inner) = self.inner {
decode_state(inner.state.load(SeqCst)).is_closed()
} else {
false
};

f.debug_struct("Receiver").field("closed", &closed).finish()
}
}

/*
*
* ===== impl Inner =====
Expand Down
2 changes: 0 additions & 2 deletions futures-channel/src/mpsc/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ pub(super) enum PopResult<T> {
Inconsistent,
}

#[derive(Debug)]
struct Node<T> {
next: AtomicPtr<Self>,
value: Option<T>,
Expand All @@ -70,7 +69,6 @@ struct Node<T> {
/// The multi-producer single-consumer structure. This is not cloneable, but it
/// may be safely shared so long as it is guaranteed that there is only one
/// popper at a time (many pushers are allowed).
#[derive(Debug)]
pub(super) struct Queue<T> {
head: AtomicPtr<Node<T>>,
tail: UnsafeCell<*mut Node<T>>,
Expand Down

0 comments on commit be27d72

Please sign in to comment.