Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add try_send method to the orchestra output channels #46

Merged
merged 8 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions metered-channel/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

//! Metered variant of bounded mpsc channels to be able to extract metrics.

use async_channel::{bounded, TryRecvError, TrySendError};
use async_channel::{bounded, TryRecvError};
use futures::{
stream::Stream,
task::{Context, Poll},
};
use std::{pin::Pin, result};

use super::{measure_tof_check, CoarseInstant, MaybeTimeOfFlight, Meter};
pub use async_channel::TrySendError;

/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
pub fn channel<T>(capacity: usize) -> (MeteredSender<T>, MeteredReceiver<T>) {
Expand All @@ -44,9 +45,18 @@ pub struct MeteredReceiver<T> {

/// A bounded channel error
#[derive(thiserror::Error, Debug)]
pub enum SendError {
pub enum SendError<T> {
#[error("Bounded channel has been disconnected")]
Disconnected,
Disconnected(T),
}

impl<T> SendError<T> {
/// Returns the inner value.
pub fn into_inner(self) -> T {
match self {
Self::Disconnected(t) => t,
}
}
}

impl<T> std::ops::Deref for MeteredReceiver<T> {
Expand Down Expand Up @@ -164,39 +174,41 @@ impl<T> MeteredSender<T> {
}

/// Send message, wait until capacity is available.
pub async fn send(&mut self, msg: T) -> result::Result<(), SendError>
pub async fn send(&mut self, msg: T) -> result::Result<(), SendError<T>>
where
Self: Unpin,
{
match self.try_send(msg) {
Err(send_err) => {
if !send_err.is_full() {
return Err(SendError::Disconnected)
return Err(SendError::Disconnected(send_err.into_inner().into()))
}

let msg = send_err.into_inner();
let msg = send_err.into_inner().into();
self.meter.note_sent();
let fut = self.inner.send(msg);
futures::pin_mut!(fut);
fut.await.map_err(|_| {
fut.await.map_err(|err| {
self.meter.retract_sent();
SendError::Disconnected
SendError::Disconnected(err.0.into())
})
},
_ => Ok(()),
}
}

/// Attempt to send message or fail immediately.
pub fn try_send(&mut self, msg: T) -> result::Result<(), TrySendError<MaybeTimeOfFlight<T>>> {
pub fn try_send(&mut self, msg: T) -> result::Result<(), TrySendError<T>> {
let msg = self.prepare_with_tof(msg);
self.inner.try_send(msg).map_err(|e| {
if e.is_full() {
// Count bounded channel sends that block.
self.meter.note_blocked();
}
self.meter.retract_sent();
e
match e {
TrySendError::Full(inner_error) => {
self.meter.note_blocked();
sandreim marked this conversation as resolved.
Show resolved Hide resolved
TrySendError::Full(inner_error.into())
},
TrySendError::Closed(inner_error) => TrySendError::Closed(inner_error.into()),
}
})
}

Expand Down
2 changes: 2 additions & 0 deletions orchestra/examples/duo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl<Context> AwesomeSubSys {
ctx.spawn(
"AwesomeSubsys",
Box::pin(async move {
sender.try_send_message(Plinko).unwrap();
sender.send_message(Plinko).await;
}),
)
Expand All @@ -53,6 +54,7 @@ impl<Context> Fortified {
ctx.spawn(
"GoblinTower",
Box::pin(async move {
sender.try_send_message(MsgStrukt(8u8)).unwrap();
sender.send_message(MsgStrukt(8u8)).await;
}),
)
Expand Down
37 changes: 36 additions & 1 deletion orchestra/proc-macro/src/impl_channels_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
signals_received: usize,
message: #message_wrapper,
) {

let res: ::std::result::Result<_, _> = match message {
#(
#feature_gates
Expand Down Expand Up @@ -101,6 +100,42 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
}
}

/// Try to send a message via a bounded channel.
pub fn try_send(
&mut self,
signals_received: usize,
message: #message_wrapper,
) -> ::std::result::Result<(), #support_crate ::metered::TrySendError<()>> {
let res: ::std::result::Result<_, _> = match message {
#(
#feature_gates
#message_wrapper :: #consumes_variant ( inner ) => {
self. #channel_name .try_send(
#support_crate ::make_packet(signals_received, inner)
).map_err(|err| match err {
#support_crate ::metered::TrySendError::Full(_inner) => #support_crate ::metered::TrySendError::Full(()),
#support_crate ::metered::TrySendError::Closed(_inner) => #support_crate ::metered::TrySendError::Closed(()),
})
}
)*
// subsystems that are wip
#(
#message_wrapper :: #unconsumes_variant ( _ ) => Ok(()),
)*
// dummy message type
#message_wrapper :: Empty => Ok(()),

#[allow(unreachable_patterns)]
// And everything that's not WIP but no subsystem consumes it
unused_msg => {
#support_crate :: tracing :: warn!("Nothing consumes {:?}", unused_msg);
Ok(())
}
};

res
}

/// Send a message to another subsystem via an unbounded channel.
pub fn send_unbounded_and_log_error(
&self,
Expand Down
8 changes: 4 additions & 4 deletions orchestra/proc-macro/src/impl_orchestra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ pub(crate) fn impl_orchestrated_subsystem(info: &OrchestraInfo) -> proc_macro2::
#support_crate ::OrchestraError::SubsystemStalled(instance.name, "message", ::std::any::type_name::<M>())
))
}
Some(res) => res.map_err(|e| #error_ty :: from(
#support_crate ::OrchestraError::QueueError(e)
Some(res) => res.map_err(|_| #error_ty :: from(
drahnr marked this conversation as resolved.
Show resolved Hide resolved
#support_crate ::OrchestraError::QueueError
)),
}
} else {
Expand All @@ -256,8 +256,8 @@ pub(crate) fn impl_orchestrated_subsystem(info: &OrchestraInfo) -> proc_macro2::
))
}
Some(res) => {
let res = res.map_err(|e| #error_ty :: from(
#support_crate ::OrchestraError::QueueError(e)
let res = res.map_err(|_| #error_ty :: from(
#support_crate ::OrchestraError::QueueError
));
if res.is_ok() {
instance.signals_received += 1;
Expand Down
10 changes: 10 additions & 0 deletions orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ pub(crate) fn impl_subsystem_sender(
).await;
}

fn try_send_message(&mut self, msg: OutgoingMessage) -> ::std::result::Result<(), #support_crate ::metered::TrySendError<()>>
{
self.channels.try_send(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
)
drahnr marked this conversation as resolved.
Show resolved Hide resolved
)
}

async fn send_messages<I>(&mut self, msgs: I)
where
I: IntoIterator<Item=OutgoingMessage> + Send,
Expand Down
16 changes: 14 additions & 2 deletions orchestra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ pub enum OrchestraError {
#[error(transparent)]
NotifyCancellation(#[from] oneshot::Canceled),

#[error(transparent)]
QueueError(#[from] metered::SendError),
#[error("Queue error")]
QueueError,

#[error("Failed to spawn task {0}")]
TaskSpawn(&'static str),
Expand All @@ -300,6 +300,12 @@ pub enum OrchestraError {
},
}

impl<T> From<metered::SendError<T>> for OrchestraError {
fn from(_err: metered::SendError<T>) -> Self {
Self::QueueError
}
}

/// Alias for a result with error type `OrchestraError`.
pub type OrchestraResult<T> = std::result::Result<T, self::OrchestraError>;

Expand Down Expand Up @@ -490,6 +496,12 @@ where
/// Send a direct message to some other `Subsystem`, routed based on message type.
async fn send_message(&mut self, msg: OutgoingMessage);

/// Tries to send a direct message to some other `Subsystem`, routed based on message type.
/// This method is useful for cases where the message queue is bounded and the message is ok
/// to be dropped if the queue is full. If the queue is full, this method will return an error.
/// This method is not async and will not block the current task.
fn try_send_message(&mut self, msg: OutgoingMessage) -> Result<(), metered::TrySendError<()>>;
drahnr marked this conversation as resolved.
Show resolved Hide resolved

/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
async fn send_messages<I>(&mut self, msgs: I)
where
Expand Down