diff --git a/metered-channel/Cargo.toml b/metered-channel/Cargo.toml index 0dcef3a..1290b13 100644 --- a/metered-channel/Cargo.toml +++ b/metered-channel/Cargo.toml @@ -31,4 +31,3 @@ tracing = { version = "0.1.35", features = ["log"] } default = ["async_channel"] async_channel = ["dep:async-channel"] futures_channel = [] - diff --git a/metered-channel/src/bounded.rs b/metered-channel/src/bounded.rs index 9033bd0..8a04bb4 100644 --- a/metered-channel/src/bounded.rs +++ b/metered-channel/src/bounded.rs @@ -22,7 +22,7 @@ use async_channel::{ #[cfg(feature = "futures_channel")] use futures::{ channel::mpsc::channel as bounded_channel, - channel::mpsc::{Receiver, Sender, TryRecvError}, + channel::mpsc::{Receiver, Sender, TryRecvError, TrySendError as FuturesTrySendError}, sink::SinkExt, }; @@ -80,6 +80,30 @@ pub enum TrySendError { Full(T), } +#[cfg(feature = "async_channel")] +impl From>> for TrySendError { + fn from(error: ChannelTrySendError>) -> Self { + match error { + ChannelTrySendError::Closed(val) => Self::Closed(val.into()), + ChannelTrySendError::Full(val) => Self::Full(val.into()), + } + } +} + +#[cfg(feature = "futures_channel")] +impl From>> for TrySendError { + fn from(error: FuturesTrySendError>) -> Self { + let disconnected = error.is_disconnected(); + let val = error.into_inner(); + let val = val.into(); + if disconnected { + Self::Closed(val) + } else { + Self::Full(val) + } + } +} + impl TrySendError { /// Returns the inner value. pub fn into_inner(self) -> T { @@ -323,11 +347,7 @@ impl MeteredSender { let msg = self.prepare_with_tof(msg); // note_sent is called in here self.inner.try_send(msg).map_err(|e| { self.meter.retract_sent(); // we didn't send it, so we need to undo the note_send - if e.is_full() { - TrySendError::Full(e.into_inner().into()) - } else { - TrySendError::Closed(e.into_inner().into()) - } + TrySendError::from(e) }) } @@ -337,11 +357,7 @@ impl MeteredSender { let msg = self.prepare_with_tof(msg); // note_sent is called in here self.inner.try_send(msg).map_err(|e| { self.meter.retract_sent(); // we didn't send it, so we need to undo the note_send - match e { - ChannelTrySendError::Full(inner_error) => TrySendError::Full(inner_error.into()), - ChannelTrySendError::Closed(inner_error) => - TrySendError::Closed(inner_error.into()), - } + TrySendError::from(e) }) } diff --git a/orchestra/Cargo.toml b/orchestra/Cargo.toml index cf951ac..e9237f2 100644 --- a/orchestra/Cargo.toml +++ b/orchestra/Cargo.toml @@ -39,7 +39,7 @@ name = "bench_main" harness = false [features] -default = ["deny_unconsumed_messages","deny_unsent_messages","async_channel"] +default = ["deny_unconsumed_messages", "deny_unsent_messages", "async_channel"] # Generate a file containing the generated code that # is used via `include_str!`. expand = ["orchestra-proc-macro/expand"]