Skip to content

Commit

Permalink
feat: add convenience conversion for TrySend error
Browse files Browse the repository at this point in the history
  • Loading branch information
drahnr committed Jul 21, 2023
1 parent 3edf423 commit ebb178f
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
1 change: 0 additions & 1 deletion metered-channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,3 @@ tracing = { version = "0.1.35", features = ["log"] }
default = ["async_channel"]
async_channel = ["dep:async-channel"]
futures_channel = []

38 changes: 27 additions & 11 deletions metered-channel/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use async_channel::{
#[cfg(feature = "futures_channel")]
use futures::{
channel::mpsc::channel as bounded_channel,
channel::mpsc::{Receiver, Sender, TryRecvError},
channel::mpsc::{Receiver, Sender, TryRecvError, TrySendError as FuturesTrySendError},
sink::SinkExt,
};

Expand Down Expand Up @@ -80,6 +80,30 @@ pub enum TrySendError<T> {
Full(T),
}

#[cfg(feature = "async_channel")]
impl<T> From<ChannelTrySendError<MaybeTimeOfFlight<T>>> for TrySendError<T> {
fn from(error: ChannelTrySendError<MaybeTimeOfFlight<T>>) -> Self {
match error {
ChannelTrySendError::Closed(val) => Self::Closed(val.into()),
ChannelTrySendError::Full(val) => Self::Full(val.into()),
}
}
}

#[cfg(feature = "futures_channel")]
impl<T> From<FuturesTrySendError<MaybeTimeOfFlight<T>>> for TrySendError<T> {
fn from(error: FuturesTrySendError<MaybeTimeOfFlight<T>>) -> Self {
let disconnected = error.is_disconnected();
let val = error.into_inner();
let val = val.into();
if disconnected {
Self::Closed(val)
} else {
Self::Full(val)
}
}
}

impl<T> TrySendError<T> {
/// Returns the inner value.
pub fn into_inner(self) -> T {
Expand Down Expand Up @@ -323,11 +347,7 @@ impl<T> MeteredSender<T> {
let msg = self.prepare_with_tof(msg); // note_sent is called in here
self.inner.try_send(msg).map_err(|e| {
self.meter.retract_sent(); // we didn't send it, so we need to undo the note_send
if e.is_full() {
TrySendError::Full(e.into_inner().into())
} else {
TrySendError::Closed(e.into_inner().into())
}
TrySendError::from(e)
})
}

Expand All @@ -337,11 +357,7 @@ impl<T> MeteredSender<T> {
let msg = self.prepare_with_tof(msg); // note_sent is called in here
self.inner.try_send(msg).map_err(|e| {
self.meter.retract_sent(); // we didn't send it, so we need to undo the note_send
match e {
ChannelTrySendError::Full(inner_error) => TrySendError::Full(inner_error.into()),
ChannelTrySendError::Closed(inner_error) =>
TrySendError::Closed(inner_error.into()),
}
TrySendError::from(e)
})
}

Expand Down
2 changes: 1 addition & 1 deletion orchestra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ name = "bench_main"
harness = false

[features]
default = ["deny_unconsumed_messages","deny_unsent_messages","async_channel"]
default = ["deny_unconsumed_messages", "deny_unsent_messages", "async_channel"]
# Generate a file containing the generated code that
# is used via `include_str!`.
expand = ["orchestra-proc-macro/expand"]
Expand Down

0 comments on commit ebb178f

Please sign in to comment.