Skip to content

Commit

Permalink
feat: add same_channel() method
Browse files Browse the repository at this point in the history
the same_channel() is familiar with futures_channel
Sender::same_receiver, however flume is a mpmc library, so use the
same_receiver or same_sender name may not fit
this PR allow users to check two Sender, Receiver, SendSink or
RecvStream are belong to the same channel or not, that may be helpful in
some cases
  • Loading branch information
Sherlock-Holo committed May 19, 2023
1 parent 8afdb02 commit 17d25c7
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ impl<'a, T> SendSink<'a, T> {
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
}

/// Returns whether the SendSinks are belong to the same channel.
pub fn same_channel(&self, other: &Self) -> bool {
self.sender().same_channel(other.sender())
}
}

impl<'a, T> Sink<T> for SendSink<'a, T> {
Expand Down Expand Up @@ -503,6 +508,11 @@ impl<'a, T> RecvStream<'a, T> {
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
}

/// Returns whether the SendSinks are belong to the same channel.
pub fn same_channel(&self, other: &Self) -> bool {
self.0.receiver.same_channel(&*other.0.receiver)
}
}

impl<'a, T> Clone for RecvStream<'a, T> {
Expand Down
10 changes: 10 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,11 @@ impl<T> Sender<T> {
shared: Arc::downgrade(&self.shared),
}
}

/// Returns whether the senders are belong to the same channel.
pub fn same_channel(&self, other: &Sender<T>) -> bool {
Arc::ptr_eq(&self.shared, &other.shared)
}
}

impl<T> Clone for Sender<T> {
Expand Down Expand Up @@ -927,6 +932,11 @@ impl<T> Receiver<T> {
pub fn receiver_count(&self) -> usize {
self.shared.receiver_count()
}

/// Returns whether the receivers are belong to the same channel.
pub fn same_channel(&self, other: &Receiver<T>) -> bool {
Arc::ptr_eq(&self.shared, &other.shared)
}
}

impl<T> Clone for Receiver<T> {
Expand Down
57 changes: 57 additions & 0 deletions tests/check_same_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#[test]
fn same_sender() {
let (tx1, _rx) = flume::unbounded::<()>();
let tx2 = tx1.clone();

assert!(tx1.same_channel(&tx2));

let (tx3, _rx) = flume::unbounded::<()>();

assert!(!tx1.same_channel(&tx3));
assert!(!tx2.same_channel(&tx3));
}

#[test]
fn same_receiver() {
let (_tx, rx1) = flume::unbounded::<()>();
let rx2 = rx1.clone();

assert!(rx1.same_channel(&rx2));

let (_tx, rx3) = flume::unbounded::<()>();

assert!(!rx1.same_channel(&rx3));
assert!(!rx2.same_channel(&rx3));
}

#[cfg(feature = "async")]
#[test]
fn same_send_sink() {
let (tx1, _rx) = flume::unbounded::<()>();
let tx1 = tx1.into_sink();
let tx2 = tx1.clone();

assert!(tx1.same_channel(&tx2));

let (tx3, _rx) = flume::unbounded::<()>();
let tx3 = tx3.into_sink();

assert!(!tx1.same_channel(&tx3));
assert!(!tx2.same_channel(&tx3));
}

#[cfg(feature = "async")]
#[test]
fn same_recv_stream() {
let (_tx, rx1) = flume::unbounded::<()>();
let rx1 = rx1.into_stream();
let rx2 = rx1.clone();

assert!(rx1.same_channel(&rx2));

let (_tx, rx3) = flume::unbounded::<()>();
let rx3 = rx3.into_stream();

assert!(!rx1.same_channel(&rx3));
assert!(!rx2.same_channel(&rx3));
}

0 comments on commit 17d25c7

Please sign in to comment.