diff --git a/subxt/src/blocks/blocks_client.rs b/subxt/src/blocks/blocks_client.rs new file mode 100644 index 0000000000..190ec9f4f1 --- /dev/null +++ b/subxt/src/blocks/blocks_client.rs @@ -0,0 +1,152 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use crate::{ + client::OnlineClientT, + error::Error, + utils::PhantomDataSendSync, + Config, +}; +use derivative::Derivative; +use futures::{ + future::Either, + stream, + Stream, + StreamExt, +}; +use sp_runtime::traits::Header; +use std::future::Future; + +/// A client for working with blocks. +#[derive(Derivative)] +#[derivative(Clone(bound = "Client: Clone"))] +pub struct BlocksClient { + client: Client, + _marker: PhantomDataSendSync, +} + +impl BlocksClient { + /// Create a new [`BlocksClient`]. + pub fn new(client: Client) -> Self { + Self { + client, + _marker: PhantomDataSendSync::new(), + } + } +} + +impl BlocksClient +where + T: Config, + Client: OnlineClientT, +{ + /// Subscribe to new best block headers. + /// + /// # Note + /// + /// This does not produce all the blocks from the chain, just the best blocks. + /// The best block is selected by the consensus algorithm. + /// This calls under the hood the `chain_subscribeNewHeads` RPC method, if you need + /// a subscription of all the blocks please use the `chain_subscribeAllHeads` method. + /// + /// These blocks haven't necessarily been finalised yet. Prefer + /// [`BlocksClient::subscribe_finalized_headers()`] if that is important. + pub fn subscribe_headers( + &self, + ) -> impl Future>, Error>> + + Send + + 'static { + let client = self.client.clone(); + async move { client.rpc().subscribe_blocks().await } + } + + /// Subscribe to finalized block headers. + /// + /// While the Substrate RPC method does not guarantee that all finalized block headers are + /// provided, this function does. + /// ``` + pub fn subscribe_finalized_headers( + &self, + ) -> impl Future>, Error>> + + Send + + 'static { + let client = self.client.clone(); + async move { subscribe_finalized_headers(client).await } + } +} + +async fn subscribe_finalized_headers( + client: Client, +) -> Result>, Error> +where + T: Config, + Client: OnlineClientT, +{ + // Fetch the last finalised block details immediately, so that we'll get + // all blocks after this one. + let last_finalized_block_hash = client.rpc().finalized_head().await?; + let last_finalized_block_num = client + .rpc() + .header(Some(last_finalized_block_hash)) + .await? + .map(|h| (*h.number()).into()); + + let sub = client.rpc().subscribe_finalized_blocks().await?; + + // Adjust the subscription stream to fill in any missing blocks. + Ok( + subscribe_to_block_headers_filling_in_gaps(client, last_finalized_block_num, sub) + .boxed(), + ) +} + +/// Note: This is exposed for testing but is not considered stable and may change +/// without notice in a patch release. +#[doc(hidden)] +pub fn subscribe_to_block_headers_filling_in_gaps( + client: Client, + mut last_block_num: Option, + sub: S, +) -> impl Stream> + Send +where + T: Config, + Client: OnlineClientT, + S: Stream> + Send, + E: Into + Send + 'static, +{ + sub.flat_map(move |s| { + let client = client.clone(); + + // Get the header, or return a stream containing just the error. + let header = match s { + Ok(header) => header, + Err(e) => return Either::Left(stream::once(async { Err(e.into()) })), + }; + + // We want all previous details up to, but not including this current block num. + let end_block_num = (*header.number()).into(); + + // This is one after the last block we returned details for last time. + let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num); + + // Iterate over all of the previous blocks we need headers for, ignoring the current block + // (which we already have the header info for): + let previous_headers = stream::iter(start_block_num..end_block_num) + .then(move |n| { + let rpc = client.rpc().clone(); + async move { + let hash = rpc.block_hash(Some(n.into())).await?; + let header = rpc.header(hash).await?; + Ok::<_, Error>(header) + } + }) + .filter_map(|h| async { h.transpose() }); + + // On the next iteration, we'll get details starting just after this end block. + last_block_num = Some(end_block_num); + + // Return a combination of any previous headers plus the new header. + Either::Right(previous_headers.chain(stream::once(async { Ok(header) }))) + }) +} diff --git a/subxt/src/blocks/mod.rs b/subxt/src/blocks/mod.rs new file mode 100644 index 0000000000..f3a575e8bd --- /dev/null +++ b/subxt/src/blocks/mod.rs @@ -0,0 +1,12 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +//! This module exposes the necessary functionality for working with events. + +mod blocks_client; + +pub use blocks_client::{ + subscribe_to_block_headers_filling_in_gaps, + BlocksClient, +}; diff --git a/subxt/src/client/offline_client.rs b/subxt/src/client/offline_client.rs index 630bb745ac..0abb78f3ac 100644 --- a/subxt/src/client/offline_client.rs +++ b/subxt/src/client/offline_client.rs @@ -3,6 +3,7 @@ // see LICENSE for license details. use crate::{ + blocks::BlocksClient, constants::ConstantsClient, events::EventsClient, rpc::RuntimeVersion, @@ -43,6 +44,11 @@ pub trait OfflineClientT: Clone + Send + Sync + 'static { fn constants(&self) -> ConstantsClient { ConstantsClient::new(self.clone()) } + + /// Work with blocks. + fn blocks(&self) -> BlocksClient { + BlocksClient::new(self.clone()) + } } /// A client that is capable of performing offline-only operations. diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index 3fe6ca1ca2..5fa9b3c898 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -7,6 +7,7 @@ use super::{ OfflineClientT, }; use crate::{ + blocks::BlocksClient, constants::ConstantsClient, error::Error, events::EventsClient, @@ -203,6 +204,11 @@ impl OnlineClient { pub fn constants(&self) -> ConstantsClient { >::constants(self) } + + /// Work with blocks. + pub fn blocks(&self) -> BlocksClient { + >::blocks(self) + } } impl OfflineClientT for OnlineClient { diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index c0d42e9be4..85fd4f221a 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -8,7 +8,6 @@ use crate::{ client::OnlineClientT, error::Error, events::EventsClient, - rpc::Subscription, Config, }; use derivative::Derivative; @@ -40,7 +39,7 @@ pub type FinalizedEventSub
= BoxStream<'static, Result>; /// A Subscription. This forms a part of the `EventSubscription` type handed back /// in codegen from `subscribe`, and is exposed to be used in codegen. #[doc(hidden)] -pub type EventSub = Subscription; +pub type EventSub = BoxStream<'static, Result>; /// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block. #[derive(Derivative)] diff --git a/subxt/src/events/events_client.rs b/subxt/src/events/events_client.rs index 27afcc5fd5..fc19231cf9 100644 --- a/subxt/src/events/events_client.rs +++ b/subxt/src/events/events_client.rs @@ -14,17 +14,10 @@ use crate::{ Config, }; use derivative::Derivative; -use futures::{ - future::Either, - stream, - Stream, - StreamExt, -}; use sp_core::{ storage::StorageKey, twox_128, }; -use sp_runtime::traits::Header; use std::future::Future; /// A client for working with events. @@ -96,7 +89,10 @@ where ) -> impl Future< Output = Result>, Error>, > + Send - + 'static { + + 'static + where + Client: Send + Sync + 'static, + { let client = self.client.clone(); async move { subscribe(client).await } } @@ -157,8 +153,8 @@ where T: Config, Client: OnlineClientT, { - let block_subscription = client.rpc().subscribe_blocks().await?; - Ok(EventSubscription::new(client, block_subscription)) + let block_subscription = client.blocks().subscribe_headers().await?; + Ok(EventSubscription::new(client, Box::pin(block_subscription))) } /// Subscribe to events from finalized blocks. @@ -169,78 +165,10 @@ where T: Config, Client: OnlineClientT, { - // fetch the last finalised block details immediately, so that we'll get - // events for each block after this one. - let last_finalized_block_hash = client.rpc().finalized_head().await?; - let last_finalized_block_number = client - .rpc() - .header(Some(last_finalized_block_hash)) - .await? - .map(|h| (*h.number()).into()); - - let sub = client.rpc().subscribe_finalized_blocks().await?; - - // Fill in any gaps between the block above and the finalized blocks reported. - let block_subscription = subscribe_to_block_headers_filling_in_gaps( - client.clone(), - last_finalized_block_number, - sub, - ); - + let block_subscription = client.blocks().subscribe_finalized_headers().await?; Ok(EventSubscription::new(client, Box::pin(block_subscription))) } -/// Note: This is exposed for testing but is not considered stable and may change -/// without notice in a patch release. -#[doc(hidden)] -pub fn subscribe_to_block_headers_filling_in_gaps( - client: Client, - mut last_block_num: Option, - sub: S, -) -> impl Stream> + Send -where - T: Config, - Client: OnlineClientT + Send + Sync, - S: Stream> + Send, - E: Into + Send + 'static, -{ - sub.flat_map(move |s| { - let client = client.clone(); - - // Get the header, or return a stream containing just the error. Our EventSubscription - // stream will return `None` as soon as it hits an error like this. - let header = match s { - Ok(header) => header, - Err(e) => return Either::Left(stream::once(async { Err(e.into()) })), - }; - - // We want all previous details up to, but not including this current block num. - let end_block_num = (*header.number()).into(); - - // This is one after the last block we returned details for last time. - let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num); - - // Iterate over all of the previous blocks we need headers for, ignoring the current block - // (which we already have the header info for): - let previous_headers = stream::iter(start_block_num..end_block_num) - .then(move |n| { - let client = client.clone(); - async move { - let hash = client.rpc().block_hash(Some(n.into())).await?; - let header = client.rpc().header(hash).await?; - Ok::<_, Error>(header) - } - }) - .filter_map(|h| async { h.transpose() }); - - // On the next iteration, we'll get details starting just after this end block. - last_block_num = Some(end_block_num); - - // Return a combination of any previous headers plus the new header. - Either::Right(previous_headers.chain(stream::once(async { Ok(header) }))) - }) -} - // The storage key needed to access events. fn system_events_key() -> StorageKey { let mut storage_key = twox_128(b"System").to_vec(); diff --git a/subxt/src/events/mod.rs b/subxt/src/events/mod.rs index 2019253e0b..fb66f1502c 100644 --- a/subxt/src/events/mod.rs +++ b/subxt/src/events/mod.rs @@ -16,10 +16,7 @@ pub use event_subscription::{ EventSubscription, FinalizedEventSub, }; -pub use events_client::{ - subscribe_to_block_headers_filling_in_gaps, - EventsClient, -}; +pub use events_client::EventsClient; pub use events_type::{ EventDetails, Events, diff --git a/subxt/src/lib.rs b/subxt/src/lib.rs index d7196e883f..38287f4ca9 100644 --- a/subxt/src/lib.rs +++ b/subxt/src/lib.rs @@ -135,6 +135,7 @@ pub use subxt_macro::subxt; +pub mod blocks; pub mod client; pub mod config; pub mod constants; @@ -148,7 +149,7 @@ pub mod tx; pub mod utils; // Expose a few of the most common types at root, -// but leave most types behind their respoctive modules. +// but leave most types behind their respective modules. pub use crate::{ client::{ OfflineClient, diff --git a/testing/integration-tests/src/blocks/mod.rs b/testing/integration-tests/src/blocks/mod.rs new file mode 100644 index 0000000000..3ec348b6d2 --- /dev/null +++ b/testing/integration-tests/src/blocks/mod.rs @@ -0,0 +1,89 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use crate::test_context; +use futures::StreamExt; + +// Check that we can subscribe to non-finalized blocks. +#[tokio::test] +async fn non_finalized_headers_subscription() -> Result<(), subxt::Error> { + let ctx = test_context().await; + let api = ctx.client(); + + let mut sub = api.blocks().subscribe_headers().await?; + + // Wait for the next set of headers, and check that the + // associated block hash is the one we just finalized. + // (this can be a bit slow as we have to wait for finalization) + let header = sub.next().await.unwrap()?; + let block_hash = header.hash(); + let current_block_hash = api.rpc().block_hash(None).await?.unwrap(); + + assert_eq!(block_hash, current_block_hash); + Ok(()) +} + +// Check that we can subscribe to finalized blocks. +#[tokio::test] +async fn finalized_headers_subscription() -> Result<(), subxt::Error> { + let ctx = test_context().await; + let api = ctx.client(); + + let mut sub = api.blocks().subscribe_finalized_headers().await?; + + // Wait for the next set of headers, and check that the + // associated block hash is the one we just finalized. + // (this can be a bit slow as we have to wait for finalization) + let header = sub.next().await.unwrap()?; + let finalized_hash = api.rpc().finalized_head().await?; + + assert_eq!(header.hash(), finalized_hash); + Ok(()) +} + +#[tokio::test] +async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::Error> { + let ctx = test_context().await; + let api = ctx.client(); + + // Manually subscribe to the next 6 finalized block headers, but deliberately + // filter out some in the middle so we get back b _ _ b _ b. This guarantees + // that there will be some gaps, even if there aren't any from the subscription. + let some_finalized_blocks = api + .rpc() + .subscribe_finalized_blocks() + .await? + .enumerate() + .take(6) + .filter(|(n, _)| { + let n = *n; + async move { n == 0 || n == 3 || n == 5 } + }) + .map(|(_, h)| h); + + // This should spot any gaps in the middle and fill them back in. + let all_finalized_blocks = subxt::blocks::subscribe_to_block_headers_filling_in_gaps( + ctx.client(), + None, + some_finalized_blocks, + ); + futures::pin_mut!(all_finalized_blocks); + + // Iterate the block headers, making sure we get them all in order. + let mut last_block_number = None; + while let Some(header) = all_finalized_blocks.next().await { + let header = header?; + + use sp_runtime::traits::Header; + let block_number: u128 = (*header.number()).into(); + + if let Some(last) = last_block_number { + assert_eq!(last + 1, block_number); + } + last_block_number = Some(block_number); + } + + assert!(last_block_number.is_some()); + Ok(()) +} diff --git a/testing/integration-tests/src/events/mod.rs b/testing/integration-tests/src/events/mod.rs index 1012e15240..bf629d600e 100644 --- a/testing/integration-tests/src/events/mod.rs +++ b/testing/integration-tests/src/events/mod.rs @@ -169,57 +169,6 @@ async fn balance_transfer_subscription() -> Result<(), subxt::Error> { Ok(()) } -#[tokio::test] -async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::Error> { - let ctx = test_context().await; - let api = ctx.client(); - - // This function is not publically available to use, but contains - // the key logic for filling in missing blocks, so we want to test it. - // This is used in `subscribe_finalized` to ensure no block headers are - // missed. - use subxt::events::subscribe_to_block_headers_filling_in_gaps; - - // Manually subscribe to the next 6 finalized block headers, but deliberately - // filter out some in the middle so we get back b _ _ b _ b. This guarantees - // that there will be some gaps, even if there aren't any from the subscription. - let some_finalized_blocks = api - .rpc() - .subscribe_finalized_blocks() - .await? - .enumerate() - .take(6) - .filter(|(n, _)| { - let n = *n; - async move { n == 0 || n == 3 || n == 5 } - }) - .map(|(_, h)| h); - - // This should spot any gaps in the middle and fill them back in. - let all_finalized_blocks = subscribe_to_block_headers_filling_in_gaps( - ctx.client(), - None, - some_finalized_blocks, - ); - futures::pin_mut!(all_finalized_blocks); - - // Iterate the block headers, making sure we get them all in order. - let mut last_block_number = None; - while let Some(header) = all_finalized_blocks.next().await { - let header = header?; - - use sp_runtime::traits::Header; - let block_number: u128 = (*header.number()).into(); - - if let Some(last) = last_block_number { - assert_eq!(last + 1, block_number); - } - last_block_number = Some(block_number); - } - - Ok(()) -} - // This is just a compile-time check that we can subscribe to events in // a context that requires the event subscription/filtering to be Send-able. // We test a typical use of EventSubscription and FilterEvents. We don't need diff --git a/testing/integration-tests/src/lib.rs b/testing/integration-tests/src/lib.rs index 1c8722c31d..4959d67506 100644 --- a/testing/integration-tests/src/lib.rs +++ b/testing/integration-tests/src/lib.rs @@ -9,6 +9,8 @@ mod codegen; #[cfg(test)] mod utils; +#[cfg(test)] +mod blocks; #[cfg(test)] mod client; #[cfg(test)]