Skip to content

Commit

Permalink
ref(system): Introduce a generic recipient type (#1622)
Browse files Browse the repository at this point in the history
`Recipient` is a type similar to `Addr`, but it is independent of
interfaces. This makes it suitable for inversion of control, where a
service would like to send messages to other services without knowing
their interfaces.

This may not be the final implementation of the IoC pattern. This
initial implementation only supports a single message and not entire
interfaces (or extending interfaces). It is modeled after `actix::Addr`.
In the future, Recipient may be refactored to support entire interfaces.
  • Loading branch information
jan-auer authored Nov 24, 2022
1 parent 2a0a6b5 commit 6addb56
Showing 1 changed file with 97 additions and 17 deletions.
114 changes: 97 additions & 17 deletions relay-system/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,35 +308,68 @@ pub trait FromMessage<M>: Interface {
fn from_message(message: M, sender: <Self::Response as MessageResponse>::Sender) -> Self;
}

/// The address of a [`Service`].
/// Abstraction over address types for service channels.
trait SendDispatch<M> {
/// The behavior declaring the return value when sending this message.
///
/// When this is implemented for a type bound to an [`Interface`], this is the same behavior as
/// used in [`FromMessage::Response`].
type Response: MessageResponse;

/// Sends a message to the service and returns the response.
///
/// See [`Addr::send`] for more information on a concrete type.
fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output;

/// Returns a trait object of this type.
fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>>;
}

/// An address to a [`Service`] implementing any interface that takes a given message.
///
/// The address of a [`Service`] allows you to [send](Self::send) messages to the service as
/// long as the service is running. It can be freely cloned.
pub struct Addr<I: Interface> {
tx: mpsc::UnboundedSender<I>,
queue_size: Arc<AtomicU64>,
/// This is similar to an [`Addr`], but it is bound to a single message rather than an interface. As
/// such, this type is not meant for communicating with a service implementation, but rather as a
/// handle to any service that can consume a given message. These can be back-channels or hooks that
/// are configured externally through Inversion of Control (IoC).
///
/// Recipients are created through [`Addr::recipient`].
pub struct Recipient<M, R> {
inner: Box<dyn SendDispatch<M, Response = R>>,
}

impl<I: Interface> fmt::Debug for Addr<I> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Addr")
.field("open", &!self.tx.is_closed())
.field("queue_size", &self.queue_size.load(Ordering::Relaxed))
.finish()
impl<M, R> Recipient<M, R>
where
R: MessageResponse,
{
/// Sends a message to the service and returns the response.
///
/// This is equivalent to [`send`](Addr::send) on the originating address.
pub fn send(&self, message: M) -> R::Output {
self.inner.send(message)
}
}

// Manually derive `Clone` since we do not require `I: Clone` and the Clone derive adds this
// constraint.
impl<I: Interface> Clone for Addr<I> {
// Manual implementation since `XSender` cannot require `Clone` for object safety.
impl<M, R: MessageResponse> Clone for Recipient<M, R> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
queue_size: self.queue_size.clone(),
inner: self.inner.to_trait_object(),
}
}
}

/// The address of a [`Service`].
///
/// Addresses allow to [send](Self::send) messages to a service that implements a corresponding
/// [`Interface`] as long as the service is running.
///
/// Addresses can be freely cloned. When the last clone is dropped, the message channel of the
/// service closes permanently, which signals to the service that it can shut down.
pub struct Addr<I: Interface> {
tx: mpsc::UnboundedSender<I>,
queue_size: Arc<AtomicU64>,
}

impl<I: Interface> Addr<I> {
/// Sends a message to the service and returns the response.
///
Expand All @@ -357,6 +390,53 @@ impl<I: Interface> Addr<I> {
self.tx.send(I::from_message(message, tx)).ok(); // it's ok to drop, the response will fail
rx
}

/// Returns a handle that can receive a given message independent of the interface.
///
/// See [`Recipient`] for more information and examples.
pub fn recipient<M>(self) -> Recipient<M, I::Response>
where
I: FromMessage<M>,
{
Recipient {
inner: Box::new(self),
}
}
}

impl<I: Interface> fmt::Debug for Addr<I> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Addr")
.field("open", &!self.tx.is_closed())
.field("queue_size", &self.queue_size.load(Ordering::Relaxed))
.finish()
}
}

// Manually derive `Clone` since we do not require `I: Clone` and the Clone derive adds this
// constraint.
impl<I: Interface> Clone for Addr<I> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
queue_size: self.queue_size.clone(),
}
}
}

impl<I, M> SendDispatch<M> for Addr<I>
where
I: Interface + FromMessage<M>,
{
type Response = <I as FromMessage<M>>::Response;

fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output {
Addr::send(self, message)
}

fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>> {
Box::new(self.clone())
}
}

/// Inbound channel for messages sent through an [`Addr`].
Expand Down

0 comments on commit 6addb56

Please sign in to comment.