From e69d05da38840fac1c81c60596beaef781781933 Mon Sep 17 00:00:00 2001 From: Paul Sbarra Date: Mon, 6 Nov 2023 21:37:17 -0600 Subject: [PATCH 1/2] provide a non-destructive mechanism to determine if a sink/stream are paired --- futures-util/src/lock/bilock.rs | 7 ++- futures-util/src/stream/stream/split.rs | 80 +++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/futures-util/src/lock/bilock.rs b/futures-util/src/lock/bilock.rs index 7ddc66ad2c..a89678e05f 100644 --- a/futures-util/src/lock/bilock.rs +++ b/futures-util/src/lock/bilock.rs @@ -149,6 +149,11 @@ impl BiLock { BiLockAcquire { bilock: self } } + /// Returns `true` only if the other `BiLock` originated from the same call to `BiLock::new`. + pub fn is_pair_of(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.arc, &other.arc) + } + /// Attempts to put the two "halves" of a `BiLock` back together and /// recover the original value. Succeeds only if the two `BiLock`s /// originated from the same call to `BiLock::new`. @@ -156,7 +161,7 @@ impl BiLock { where T: Unpin, { - if Arc::ptr_eq(&self.arc, &other.arc) { + if self.is_pair_of(&other) { drop(other); let inner = Arc::try_unwrap(self.arc) .ok() diff --git a/futures-util/src/stream/stream/split.rs b/futures-util/src/stream/stream/split.rs index e2034e0c27..1a7fdcb387 100644 --- a/futures-util/src/stream/stream/split.rs +++ b/futures-util/src/stream/stream/split.rs @@ -15,6 +15,13 @@ pub struct SplitStream(BiLock); impl Unpin for SplitStream {} +impl SplitStream { + /// Returns `true` if the `SplitStream` and `SplitSink` originate from the same call to `StreamExt::split`. + pub fn is_pair_of(&self, other: &SplitSink) -> bool { + other.is_pair_of(&self) + } +} + impl SplitStream { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream` and `SplitSink` are @@ -60,6 +67,13 @@ impl + Unpin, Item> SplitSink { } } +impl SplitSink { + /// Returns `true` if the `SplitStream` and `SplitSink` originate from the same call to `StreamExt::split`. + pub fn is_pair_of(&self, other: &SplitStream) -> bool { + self.lock.is_pair_of(&other.0) + } +} + impl, Item> SplitSink { fn poll_flush_slot( mut inner: Pin<&mut S>, @@ -142,3 +156,69 @@ impl fmt::Display for ReuniteError { #[cfg(feature = "std")] impl std::error::Error for ReuniteError {} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{sink::Sink, stream::StreamExt}; + use core::marker::PhantomData; + + struct NopStream { + phantom: PhantomData, + } + + impl Stream for NopStream { + type Item = Item; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + todo!() + } + } + + impl Sink for NopStream { + type Error = (); + + fn poll_ready( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + + fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> { + todo!() + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + } + + #[test] + fn test_pairing() { + let s1 = NopStream::<()> { phantom: PhantomData }; + let (sink1, stream1) = s1.split(); + assert!(sink1.is_pair_of(&stream1)); + assert!(stream1.is_pair_of(&sink1)); + + let s2 = NopStream::<()> { phantom: PhantomData }; + let (sink2, stream2) = s2.split(); + assert!(sink2.is_pair_of(&stream2)); + assert!(stream2.is_pair_of(&sink2)); + + assert!(!sink1.is_pair_of(&stream2)); + assert!(!stream1.is_pair_of(&sink2)); + assert!(!sink2.is_pair_of(&stream1)); + assert!(!stream2.is_pair_of(&sink1)); + } +} From d284014894aee388eb3d05bf062398b8584871d7 Mon Sep 17 00:00:00 2001 From: Paul Sbarra Date: Sun, 12 Nov 2023 19:23:53 -0600 Subject: [PATCH 2/2] provide a mechanism to determine if io read/write halves are from the same stream --- futures-util/src/io/split.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/futures-util/src/io/split.rs b/futures-util/src/io/split.rs index 3f1b9af456..81d1e6dcb5 100644 --- a/futures-util/src/io/split.rs +++ b/futures-util/src/io/split.rs @@ -31,6 +31,13 @@ pub(super) fn split(t: T) -> (ReadHalf, WriteHalf< (ReadHalf { handle: a }, WriteHalf { handle: b }) } +impl ReadHalf { + /// Checks if this `ReadHalf` and some `WriteHalf` were split from the same stream. + pub fn is_pair_of(&self, other: &WriteHalf) -> bool { + self.handle.is_pair_of(&other.handle) + } +} + impl ReadHalf { /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back /// together. Succeeds only if the `ReadHalf` and `WriteHalf` are @@ -42,6 +49,13 @@ impl ReadHalf { } } +impl WriteHalf { + /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same stream. + pub fn is_pair_of(&self, other: &ReadHalf) -> bool { + self.handle.is_pair_of(&other.handle) + } +} + impl WriteHalf { /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back /// together. Succeeds only if the `ReadHalf` and `WriteHalf` are