From 1779dbef95dd5189d73516782e861661b463cd3d Mon Sep 17 00:00:00 2001 From: Hernando Castano Date: Wed, 27 May 2020 12:31:48 -0400 Subject: [PATCH] Add a subscription manager (#548) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * WIP: Add a subscription manager. The idea is to use the `Subscriptions` struct from Substrate, which is used to drive subscription futures to completion, and modify it for "general" use. * Allow IdProvider::Id and SubscriptionId to work together Adds trait bounds that allow conversion between the two, removing the need for generics in SubscriptionId. * Update SubscriptionId tests * Rustfmt * Use `SubscriptionId` as the key for `active_subscriptions` * Add subscription ID providers. Adds two subscription ID providers which can be used by the SubscriptionManager. One provides a simple numeric ID, while the other provides a random string. * Add some documentation * Clean up comment and naming * Change the NumericIdProvider to use `u64` IDs Instead of providing a guarantee that we can convert between `usize` and `u64` we make the assumptions that it's unlikely that we're running on an architecture larger than 64-bits and we use a `u64` directly. * Add tests for IdProvider and SubscriptionManager Note: There's one test that doesn't pass yet which has to do with the `cancel()` function of the SubscriptionManager. * Restore RandomStringIdProvider as the default provider * Retain receiver.: * Make test executor a lazy static * Rustfmt * Add a comment to test * Remove `matches!` macro Our Windows CI runner isn't up to date and thinks this is still a nightly feature Co-authored-by: Tomasz Drwięga --- pubsub/Cargo.toml | 3 + pubsub/src/lib.rs | 1 + pubsub/src/manager.rs | 385 ++++++++++++++++++++++++++++++++++++++++++ pubsub/src/types.rs | 102 ++++++++--- 4 files changed, 464 insertions(+), 27 deletions(-) create mode 100644 pubsub/src/manager.rs diff --git a/pubsub/Cargo.toml b/pubsub/Cargo.toml index 852371f22..f7ccbc046 100644 --- a/pubsub/Cargo.toml +++ b/pubsub/Cargo.toml @@ -15,9 +15,12 @@ log = "0.4" parking_lot = "0.10.0" jsonrpc-core = { version = "14.1", path = "../core" } serde = "1.0" +rand = "0.7" [dev-dependencies] jsonrpc-tcp-server = { version = "14.1", path = "../tcp" } +futures = { version = "0.3", features = ["compat", "thread-pool"] } +lazy_static = "1.4" [badges] travis-ci = { repository = "paritytech/jsonrpc", branch = "master"} diff --git a/pubsub/src/lib.rs b/pubsub/src/lib.rs index a4f74eca5..145304486 100644 --- a/pubsub/src/lib.rs +++ b/pubsub/src/lib.rs @@ -9,6 +9,7 @@ extern crate log; mod delegates; mod handler; +pub mod manager; pub mod oneshot; mod subscription; pub mod typed; diff --git a/pubsub/src/manager.rs b/pubsub/src/manager.rs new file mode 100644 index 000000000..e8e725949 --- /dev/null +++ b/pubsub/src/manager.rs @@ -0,0 +1,385 @@ +//! The SubscriptionManager used to manage subscription based RPCs. +//! +//! The manager provides four main things in terms of functionality: +//! +//! 1. The ability to create unique subscription IDs through the +//! use of the `IdProvider` trait. Two implementations are availble +//! out of the box, a `NumericIdProvider` and a `RandomStringIdProvider`. +//! +//! 2. An executor with which to drive `Future`s to completion. +//! +//! 3. A way to add new subscriptions. Subscriptions should come in the form +//! of a `Stream`. These subscriptions will be transformed into notifications +//! by the manager, which can be consumed by the client. +//! +//! 4. A way to cancel any currently active subscription. + +use std::collections::HashMap; +use std::iter; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use crate::core::futures::sync::oneshot; +use crate::core::futures::{future as future01, Future as Future01}; +use crate::{ + typed::{Sink, Subscriber}, + SubscriptionId, +}; + +use log::{error, warn}; +use parking_lot::Mutex; +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; + +/// Alias for an implementation of `futures::future::Executor`. +pub type TaskExecutor = Arc + Send>> + Send + Sync>; + +type ActiveSubscriptions = Arc>>>; + +/// Trait used to provide unique subscription IDs. +pub trait IdProvider { + /// A unique ID used to identify a subscription. + type Id: Default + Into; + + /// Returns the next ID for the subscription. + fn next_id(&self) -> Self::Id; +} + +/// Provides a thread-safe incrementing integer which +/// can be used as a subscription ID. +#[derive(Debug)] +pub struct NumericIdProvider { + current_id: AtomicUsize, +} + +impl NumericIdProvider { + /// Create a new NumericIdProvider. + pub fn new() -> Self { + Default::default() + } + + /// Create a new NumericIdProvider starting from + /// the given ID. + pub fn with_id(id: AtomicUsize) -> Self { + Self { current_id: id } + } +} + +impl IdProvider for NumericIdProvider { + type Id = u64; + + fn next_id(&self) -> Self::Id { + self.current_id.fetch_add(1, Ordering::AcqRel) as u64 + } +} + +impl Default for NumericIdProvider { + fn default() -> Self { + NumericIdProvider { + current_id: AtomicUsize::new(1), + } + } +} + +/// Used to generate random strings for use as +/// subscription IDs. +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +pub struct RandomStringIdProvider { + len: usize, +} + +impl RandomStringIdProvider { + /// Create a new RandomStringIdProvider. + pub fn new() -> Self { + Default::default() + } + + /// Create a new RandomStringIdProvider, which will generate + /// random id strings of the given length. + pub fn with_len(len: usize) -> Self { + Self { len } + } +} + +impl IdProvider for RandomStringIdProvider { + type Id = String; + + fn next_id(&self) -> Self::Id { + let mut rng = thread_rng(); + let id: String = iter::repeat(()) + .map(|()| rng.sample(Alphanumeric)) + .take(self.len) + .collect(); + id + } +} + +impl Default for RandomStringIdProvider { + fn default() -> Self { + Self { len: 16 } + } +} + +/// Subscriptions manager. +/// +/// Takes care of assigning unique subscription ids and +/// driving the sinks into completion. +#[derive(Clone)] +pub struct SubscriptionManager { + id_provider: I, + active_subscriptions: ActiveSubscriptions, + executor: TaskExecutor, +} + +impl SubscriptionManager { + /// Creates a new SubscriptionManager. + /// + /// Uses `RandomStringIdProvider` as the ID provider. + pub fn new(executor: TaskExecutor) -> Self { + Self { + id_provider: RandomStringIdProvider::default(), + active_subscriptions: Default::default(), + executor, + } + } +} + +impl SubscriptionManager { + /// Creates a new SubscriptionManager with the specified + /// ID provider. + pub fn with_id_provider(id_provider: I, executor: TaskExecutor) -> Self { + Self { + id_provider, + active_subscriptions: Default::default(), + executor, + } + } + + /// Borrows the internal task executor. + /// + /// This can be used to spawn additional tasks on the underlying event loop. + pub fn executor(&self) -> &TaskExecutor { + &self.executor + } + + /// Creates new subscription for given subscriber. + /// + /// Second parameter is a function that converts Subscriber Sink into a Future. + /// This future will be driven to completion by the underlying event loop + pub fn add(&self, subscriber: Subscriber, into_future: G) -> SubscriptionId + where + G: FnOnce(Sink) -> R, + R: future01::IntoFuture, + F: future01::Future + Send + 'static, + { + let id = self.id_provider.next_id(); + let subscription_id: SubscriptionId = id.into(); + if let Ok(sink) = subscriber.assign_id(subscription_id.clone()) { + let (tx, rx) = oneshot::channel(); + let future = into_future(sink) + .into_future() + .select(rx.map_err(|e| warn!("Error timing out: {:?}", e))) + .then(|_| Ok(())); + + self.active_subscriptions.lock().insert(subscription_id.clone(), tx); + if self.executor.execute(Box::new(future)).is_err() { + error!("Failed to spawn RPC subscription task"); + } + } + + subscription_id + } + + /// Cancel subscription. + /// + /// Returns true if subscription existed or false otherwise. + pub fn cancel(&self, id: SubscriptionId) -> bool { + if let Some(tx) = self.active_subscriptions.lock().remove(&id) { + let _ = tx.send(()); + return true; + } + + false + } +} + +impl SubscriptionManager { + /// Creates a new SubscriptionManager. + pub fn with_executor(executor: TaskExecutor) -> Self { + Self { + id_provider: Default::default(), + active_subscriptions: Default::default(), + executor, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::typed::Subscriber; + use futures::{compat::Future01CompatExt, executor, FutureExt}; + use futures::{stream, StreamExt, TryStreamExt}; + + use crate::core::futures::sink::Sink as Sink01; + use crate::core::futures::stream::Stream as Stream01; + + // Executor shared by all tests. + // + // This shared executor is used to prevent `Too many open files` errors + // on systems with a lot of cores. + lazy_static::lazy_static! { + static ref EXECUTOR: executor::ThreadPool = executor::ThreadPool::new() + .expect("Failed to create thread pool executor for tests"); + } + + pub struct TestTaskExecutor; + type Boxed01Future01 = Box + Send + 'static>; + + impl future01::Executor for TestTaskExecutor { + fn execute(&self, future: Boxed01Future01) -> std::result::Result<(), future01::ExecuteError> { + EXECUTOR.spawn_ok(future.compat().map(drop)); + Ok(()) + } + } + + #[test] + fn making_a_numeric_id_provider_works() { + let provider = NumericIdProvider::new(); + let expected_id = 1; + let actual_id = provider.next_id(); + + assert_eq!(actual_id, expected_id); + } + + #[test] + fn default_numeric_id_provider_works() { + let provider: NumericIdProvider = Default::default(); + let expected_id = 1; + let actual_id = provider.next_id(); + + assert_eq!(actual_id, expected_id); + } + + #[test] + fn numeric_id_provider_with_id_works() { + let provider = NumericIdProvider::with_id(AtomicUsize::new(5)); + let expected_id = 5; + let actual_id = provider.next_id(); + + assert_eq!(actual_id, expected_id); + } + + #[test] + fn random_string_provider_returns_id_with_correct_default_len() { + let provider = RandomStringIdProvider::new(); + let expected_len = 16; + let actual_len = provider.next_id().len(); + + assert_eq!(actual_len, expected_len); + } + + #[test] + fn random_string_provider_returns_id_with_correct_user_given_len() { + let expected_len = 10; + let provider = RandomStringIdProvider::with_len(expected_len); + let actual_len = provider.next_id().len(); + + assert_eq!(actual_len, expected_len); + } + + #[test] + fn new_subscription_manager_defaults_to_random_string_provider() { + let manager = SubscriptionManager::new(Arc::new(TestTaskExecutor)); + let subscriber = Subscriber::::new_test("test_subTest").0; + let stream = stream::iter(vec![Ok(1)]).compat(); + + let id = manager.add(subscriber, |sink| { + let stream = stream.map(|res| Ok(res)); + + sink.sink_map_err(|_| ()).send_all(stream).map(|_| ()) + }); + + if let SubscriptionId::String(_) = id { + assert!(true) + } else { + assert!(false, "Expected SubscriptionId::String"); + } + } + + #[test] + fn new_subscription_manager_works_with_numeric_id_provider() { + let id_provider = NumericIdProvider::default(); + let manager = SubscriptionManager::with_id_provider(id_provider, Arc::new(TestTaskExecutor)); + + let subscriber = Subscriber::::new_test("test_subTest").0; + let stream = stream::iter(vec![Ok(1)]).compat(); + + let id = manager.add(subscriber, |sink| { + let stream = stream.map(|res| Ok(res)); + + sink.sink_map_err(|_| ()).send_all(stream).map(|_| ()) + }); + + if let SubscriptionId::Number(_) = id { + assert!(true) + } else { + assert!(false, "Expected SubscriptionId::Number"); + } + } + + #[test] + fn new_subscription_manager_works_with_random_string_provider() { + let id_provider = RandomStringIdProvider::default(); + let manager = SubscriptionManager::with_id_provider(id_provider, Arc::new(TestTaskExecutor)); + + let subscriber = Subscriber::::new_test("test_subTest").0; + let stream = stream::iter(vec![Ok(1)]).compat(); + + let id = manager.add(subscriber, |sink| { + let stream = stream.map(|res| Ok(res)); + + sink.sink_map_err(|_| ()).send_all(stream).map(|_| ()) + }); + + if let SubscriptionId::String(_) = id { + assert!(true) + } else { + assert!(false, "Expected SubscriptionId::String"); + } + } + + #[test] + fn subscription_is_canceled_if_it_existed() { + let manager = SubscriptionManager::::with_executor(Arc::new(TestTaskExecutor)); + // Need to bind receiver here (unlike the other tests) or else the subscriber + // will think the client has disconnected and not update `active_subscriptions` + let (subscriber, _recv, _) = Subscriber::::new_test("test_subTest"); + + let (mut tx, rx) = futures::channel::mpsc::channel(8); + tx.start_send(1).unwrap(); + let stream = rx.map(|v| Ok::<_, ()>(v)).compat(); + + let id = manager.add(subscriber, |sink| { + let stream = stream.map(|res| Ok(res)); + + sink.sink_map_err(|_| ()).send_all(stream).map(|_| ()) + }); + + let is_cancelled = manager.cancel(id); + assert!(is_cancelled); + } + + #[test] + fn subscription_is_not_canceled_because_it_didnt_exist() { + let manager = SubscriptionManager::new(Arc::new(TestTaskExecutor)); + + let id: SubscriptionId = 23u32.into(); + let is_cancelled = manager.cancel(id); + let is_not_cancelled = !is_cancelled; + + assert!(is_not_cancelled); + } +} diff --git a/pubsub/src/types.rs b/pubsub/src/types.rs index be5fefc49..9e1437e74 100644 --- a/pubsub/src/types.rs +++ b/pubsub/src/types.rs @@ -35,12 +35,13 @@ impl PubSubMetadata for Option { } /// Unique subscription id. +/// /// NOTE Assigning same id to different requests will cause the previous request to be unsubscribed. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum SubscriptionId { - /// U64 number + /// A numerical ID, represented by a `u64`. Number(u64), - /// String + /// A non-numerical ID, for example a hash. String(String), } @@ -61,12 +62,6 @@ impl From for SubscriptionId { } } -impl From for SubscriptionId { - fn from(other: u64) -> Self { - SubscriptionId::Number(other) - } -} - impl From for core::Value { fn from(sub: SubscriptionId) -> Self { match sub { @@ -76,30 +71,83 @@ impl From for core::Value { } } +macro_rules! impl_from_num { + ($num:ty) => { + impl From<$num> for SubscriptionId { + fn from(other: $num) -> Self { + SubscriptionId::Number(other.into()) + } + } + }; +} + +impl_from_num!(u8); +impl_from_num!(u16); +impl_from_num!(u32); +impl_from_num!(u64); + #[cfg(test)] mod tests { use super::SubscriptionId; use crate::core::Value; #[test] - fn should_convert_between_value_and_subscription_id() { - // given - let val1 = Value::Number(5.into()); - let val2 = Value::String("asdf".into()); - let val3 = Value::Null; - - // when - let res1 = SubscriptionId::parse_value(&val1); - let res2 = SubscriptionId::parse_value(&val2); - let res3 = SubscriptionId::parse_value(&val3); - - // then - assert_eq!(res1, Some(SubscriptionId::Number(5))); - assert_eq!(res2, Some(SubscriptionId::String("asdf".into()))); - assert_eq!(res3, None); - - // and back - assert_eq!(Value::from(res1.unwrap()), val1); - assert_eq!(Value::from(res2.unwrap()), val2); + fn should_convert_between_number_value_and_subscription_id() { + let val = Value::Number(5.into()); + let res = SubscriptionId::parse_value(&val); + + assert_eq!(res, Some(SubscriptionId::Number(5))); + assert_eq!(Value::from(res.unwrap()), val); + } + + #[test] + fn should_convert_between_string_value_and_subscription_id() { + let val = Value::String("asdf".into()); + let res = SubscriptionId::parse_value(&val); + + assert_eq!(res, Some(SubscriptionId::String("asdf".into()))); + assert_eq!(Value::from(res.unwrap()), val); + } + + #[test] + fn should_convert_between_null_value_and_subscription_id() { + let val = Value::Null; + let res = SubscriptionId::parse_value(&val); + assert_eq!(res, None); + } + + #[test] + fn should_convert_from_u8_to_subscription_id() { + let val = 5u8; + let res: SubscriptionId = val.into(); + assert_eq!(res, SubscriptionId::Number(5)); + } + + #[test] + fn should_convert_from_u16_to_subscription_id() { + let val = 5u16; + let res: SubscriptionId = val.into(); + assert_eq!(res, SubscriptionId::Number(5)); + } + + #[test] + fn should_convert_from_u32_to_subscription_id() { + let val = 5u32; + let res: SubscriptionId = val.into(); + assert_eq!(res, SubscriptionId::Number(5)); + } + + #[test] + fn should_convert_from_u64_to_subscription_id() { + let val = 5u64; + let res: SubscriptionId = val.into(); + assert_eq!(res, SubscriptionId::Number(5)); + } + + #[test] + fn should_convert_from_string_to_subscription_id() { + let val = "String".to_string(); + let res: SubscriptionId = val.into(); + assert_eq!(res, SubscriptionId::String("String".to_string())); } }