Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a subscription manager #548

Merged
merged 16 commits into from
May 27, 2020
Merged
1 change: 1 addition & 0 deletions pubsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern crate log;

mod delegates;
mod handler;
pub mod manager;
pub mod oneshot;
mod subscription;
pub mod typed;
Expand Down
115 changes: 115 additions & 0 deletions pubsub/src/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//! Provides an executor for subscription Futures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should expand on this module description a bit. The subscription manager is opinionated way to handle processing subscriptions that are based on Stream coming from the user codebase.
The manager takes care of:

  1. Assigning the ID
  2. Consuming a Stream of events (coming from user code) and transforming it into a subscription notifications to the client.
  3. Spawning a resulting future to executor
  4. Cancelling the stream when the subscription is canceled.


use std::collections::HashMap;
use std::hash::Hash;
use std::sync::{
atomic::{self, AtomicUsize},
Arc,
};

use crate::core::futures::sync::oneshot;
use crate::core::futures::{future, Future};
use crate::{
typed::{Sink, Subscriber},
SubscriptionId,
};
use log::{error, warn};
use parking_lot::Mutex;

/// Alias for an implementation of `futures::future::Executor`.
pub type TaskExecutor = Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;

/// Trait used to provide unique subscription ids.
pub trait IdProvider {
// TODO: Maybe have this impl Into<u64>?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to have String ids as well.

type Id: Clone + Default + Eq + Hash;

/// Returns next id for the subscription.
fn next_id(&self) -> Self::Id;
}

/// Trait used to drive subscription Futures to completion.
pub trait SubscriptionManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the trait is really needed, I'd rather have Manager with a default generic parameter of u64 subscription IdProvider. Mainly to avoid importing extra traits to call functions.
Do you have some use cases in mind where having it behind trait would be useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my reasoning kinda was that if people wanted a "default" implementation of a SubscriptionManager they'd use the Manager struct, and if they wanted to roll their own they just had to conform to the SubscriptionManager trait. Is it reasonable to expect that people will want to use something other than the "default" implementation? If not, then yeah we don't need to the trait.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the trait is not really used in the library at all I think a concrete implementation is enough. If someone is building their own solution and needs that abstraction they can introduce a trait themselves and implement it for the type from our library.

/// Create a new `SubscriptionManager`.
fn new(&self) -> Self;
/// Borrows the internal task executor.
///
/// This can be used to spawn additional tasks on the underlying event loop.
fn executor(&self) -> &TaskExecutor;
/// Create new subscription for given subscriber.
///
/// Second parameter is a function that converts Subscriber sink into a future.
/// This future will be driven to completion by the underlying event loop
/// or will be cancelled in case #cancel is invoked.
fn add<T, E, G, R, F, N>(&self, subscriber: Subscriber<T, E>, into_future: G) -> SubscriptionId<N>
where
G: FnOnce(Sink<T, E>) -> R,
R: future::IntoFuture<Future = F, Item = (), Error = ()>,
F: future::Future<Item = (), Error = ()> + Send + 'static;
/// Cancel subscription.
///
/// Should true if subscription existed or false otherwise.
fn cancel<N>(&self, id: SubscriptionId<N>) -> bool;
}

/// Subscriptions manager.
///
/// Takes care of assigning unique subscription ids and
/// driving the sinks into completion.
#[derive(Clone)]
pub struct Manager<I: Default + IdProvider> {
next_id: I,
active_subscriptions: Arc<Mutex<HashMap<I::Id, oneshot::Sender<()>>>>,
HCastano marked this conversation as resolved.
Show resolved Hide resolved
executor: TaskExecutor, // Make generic?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO It's fine, in most workloads spawning subscription is not going to be a bottleneck so we can safely do a virtual call here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
executor: TaskExecutor, // Make generic?
executor: TaskExecutor,

}

impl<I: Default + IdProvider> SubscriptionManager for Manager<I> {
fn new(&self) -> Self {
Self {
next_id: Default::default(),
active_subscriptions: Default::default(),
executor: self.executor,
}
}

fn executor(&self) -> &TaskExecutor {
&self.executor
}

fn add<T, E, G, R, F, N>(&self, subscriber: Subscriber<T, E>, into_future: G) -> SubscriptionId<N>
where
G: FnOnce(Sink<T, E>) -> R,
R: future::IntoFuture<Future = F, Item = (), Error = ()>,
F: future::Future<Item = (), Error = ()> + Send + 'static,
{
let id = self.next_id.next_id();
let subscription_id: SubscriptionId = id.into();
if let Ok(sink) = subscriber.assign_id(subscription_id.clone()) {
let (tx, rx) = oneshot::channel();
let future = into_future(sink)
.into_future()
.select(rx.map_err(|e| warn!("Error timing out: {:?}", e)))
.then(|_| Ok(()));

self.active_subscriptions.lock().insert(id, tx);
if self.executor.execute(Box::new(future)).is_err() {
error!("Failed to spawn RPC subscription task");
}
}

subscription_id
}

/// Cancel subscription.
///
/// Returns true if subscription existed or false otherwise.
fn cancel<N>(&self, id: SubscriptionId<N>) -> bool {
if let SubscriptionId::Number(id) = id {
if let Some(tx) = self.active_subscriptions.lock().remove(&id) {
let _ = tx.send(());
return true;
}
}
false
}
}
44 changes: 33 additions & 11 deletions pubsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::core::futures::sync::mpsc;
use std::sync::Arc;

use crate::subscription::Session;
use crate::manager::IdProvider;

/// Raw transport sink for specific client.
pub type TransportSender = mpsc::Sender<String>;
Expand Down Expand Up @@ -35,47 +36,68 @@ impl<T: PubSubMetadata> PubSubMetadata for Option<T> {
}

/// Unique subscription id.
///
/// NOTE Assigning same id to different requests will cause the previous request to be unsubscribed.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SubscriptionId {
/// U64 number
Number(u64),
pub enum SubscriptionId<N> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what you meant with Into<u64> in the earlier comment. Why not require Into<SubscriptionId> instead? This together with some extra From implementations that we would add here (for numeric types and String as you did now) would allow us to keep that configurable without maiking SubscriptionId generic.

/// Number
Number(N),
/// String
String(String),
}

impl SubscriptionId {
impl<N> SubscriptionId<N> {
/// Parses `core::Value` into unique subscription id.
pub fn parse_value(val: &core::Value) -> Option<SubscriptionId> {
pub fn parse_value(val: &core::Value) -> Option<SubscriptionId<N>> {
match *val {
core::Value::String(ref val) => Some(SubscriptionId::String(val.clone())),
core::Value::Number(ref val) => val.as_u64().map(SubscriptionId::Number),
core::Value::Number(ref val) => val.as_u64().map(SubscriptionId::Number), // TODO: Fix
_ => None,
}
}
}

impl From<String> for SubscriptionId {
impl<N> From<String> for SubscriptionId<N> {
fn from(other: String) -> Self {
SubscriptionId::String(other)
}
}

impl From<u64> for SubscriptionId {
fn from(other: u64) -> Self {
impl<N, I: IdProvider> From<I> for SubscriptionId<N> {
fn from(other: I::Id) -> Self {
SubscriptionId::Number(other)
}
}

impl From<SubscriptionId> for core::Value {
fn from(sub: SubscriptionId) -> Self {
impl<N> From<SubscriptionId<N>> for core::Value {
fn from(sub: SubscriptionId<N>) -> Self {
match sub {
SubscriptionId::Number(val) => core::Value::Number(val.into()),
SubscriptionId::String(val) => core::Value::String(val),
}
}
}

macro_rules! impl_from_num {
($num:ty) => {
impl<N> From<$num> for SubscriptionId<N> {
fn from(other: $num) -> Self {
SubscriptionId::Number(other)
}
}
}
}

impl_from_num!(u8);
impl_from_num!(u16);
impl_from_num!(u32);
impl_from_num!(u64);

impl_from_num!(i8);
impl_from_num!(i16);
impl_from_num!(i32);
impl_from_num!(i64);

#[cfg(test)]
mod tests {
use super::SubscriptionId;
Expand Down