Skip to content

Commit

Permalink
Implement BlocksClient for working with blocks (#671)
Browse files Browse the repository at this point in the history
* rpc: Fill in any missing finalized blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Move fill blocks test to RPC location

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* events: Remove the fill in strategy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Introduce blocks client

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* client: Enable the block API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Simplify `subscribe_finalized_headers` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Add tests for `subscribe_finalized_headers`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Implement `subscribe_headers`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Add tests for `subscribe_headers`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Move `missing_block_headers_will_be_filled_in` to blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* events: Use the new subscribe to blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Change API to return future similar to events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* events: Use blocks API for subscribing to blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update subxt/src/blocks/blocks_client.rs

Co-authored-by: James Wilson <james@jsdw.me>

* blocks: Simplify docs for `subscribe_finalized_headers`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Use `PhantomDataSendSync` to avoid other bounds on `T: Config`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Add docs for best blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Avoid one clone for the `client.rpc()`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update testing/integration-tests/src/blocks/mod.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* blocks: Improve `subscribe_headers` doc

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: James Wilson <james@jsdw.me>
Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
  • Loading branch information
3 people authored Oct 10, 2022
1 parent 10def3c commit 95e6aa9
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 137 deletions.
152 changes: 152 additions & 0 deletions subxt/src/blocks/blocks_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use crate::{
client::OnlineClientT,
error::Error,
utils::PhantomDataSendSync,
Config,
};
use derivative::Derivative;
use futures::{
future::Either,
stream,
Stream,
StreamExt,
};
use sp_runtime::traits::Header;
use std::future::Future;

/// A client for working with blocks.
#[derive(Derivative)]
#[derivative(Clone(bound = "Client: Clone"))]
pub struct BlocksClient<T, Client> {
client: Client,
_marker: PhantomDataSendSync<T>,
}

impl<T, Client> BlocksClient<T, Client> {
/// Create a new [`BlocksClient`].
pub fn new(client: Client) -> Self {
Self {
client,
_marker: PhantomDataSendSync::new(),
}
}
}

impl<T, Client> BlocksClient<T, Client>
where
T: Config,
Client: OnlineClientT<T>,
{
/// Subscribe to new best block headers.
///
/// # Note
///
/// This does not produce all the blocks from the chain, just the best blocks.
/// The best block is selected by the consensus algorithm.
/// This calls under the hood the `chain_subscribeNewHeads` RPC method, if you need
/// a subscription of all the blocks please use the `chain_subscribeAllHeads` method.
///
/// These blocks haven't necessarily been finalised yet. Prefer
/// [`BlocksClient::subscribe_finalized_headers()`] if that is important.
pub fn subscribe_headers(
&self,
) -> impl Future<Output = Result<impl Stream<Item = Result<T::Header, Error>>, Error>>
+ Send
+ 'static {
let client = self.client.clone();
async move { client.rpc().subscribe_blocks().await }
}

/// Subscribe to finalized block headers.
///
/// While the Substrate RPC method does not guarantee that all finalized block headers are
/// provided, this function does.
/// ```
pub fn subscribe_finalized_headers(
&self,
) -> impl Future<Output = Result<impl Stream<Item = Result<T::Header, Error>>, Error>>
+ Send
+ 'static {
let client = self.client.clone();
async move { subscribe_finalized_headers(client).await }
}
}

async fn subscribe_finalized_headers<T, Client>(
client: Client,
) -> Result<impl Stream<Item = Result<T::Header, Error>>, Error>
where
T: Config,
Client: OnlineClientT<T>,
{
// Fetch the last finalised block details immediately, so that we'll get
// all blocks after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_num = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());

let sub = client.rpc().subscribe_finalized_blocks().await?;

// Adjust the subscription stream to fill in any missing blocks.
Ok(
subscribe_to_block_headers_filling_in_gaps(client, last_finalized_block_num, sub)
.boxed(),
)
}

/// Note: This is exposed for testing but is not considered stable and may change
/// without notice in a patch release.
#[doc(hidden)]
pub fn subscribe_to_block_headers_filling_in_gaps<T, Client, S, E>(
client: Client,
mut last_block_num: Option<u64>,
sub: S,
) -> impl Stream<Item = Result<T::Header, Error>> + Send
where
T: Config,
Client: OnlineClientT<T>,
S: Stream<Item = Result<T::Header, E>> + Send,
E: Into<Error> + Send + 'static,
{
sub.flat_map(move |s| {
let client = client.clone();

// Get the header, or return a stream containing just the error.
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};

// We want all previous details up to, but not including this current block num.
let end_block_num = (*header.number()).into();

// This is one after the last block we returned details for last time.
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);

// Iterate over all of the previous blocks we need headers for, ignoring the current block
// (which we already have the header info for):
let previous_headers = stream::iter(start_block_num..end_block_num)
.then(move |n| {
let rpc = client.rpc().clone();
async move {
let hash = rpc.block_hash(Some(n.into())).await?;
let header = rpc.header(hash).await?;
Ok::<_, Error>(header)
}
})
.filter_map(|h| async { h.transpose() });

// On the next iteration, we'll get details starting just after this end block.
last_block_num = Some(end_block_num);

// Return a combination of any previous headers plus the new header.
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
})
}
12 changes: 12 additions & 0 deletions subxt/src/blocks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

//! This module exposes the necessary functionality for working with events.

mod blocks_client;

pub use blocks_client::{
subscribe_to_block_headers_filling_in_gaps,
BlocksClient,
};
6 changes: 6 additions & 0 deletions subxt/src/client/offline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// see LICENSE for license details.

use crate::{
blocks::BlocksClient,
constants::ConstantsClient,
events::EventsClient,
rpc::RuntimeVersion,
Expand Down Expand Up @@ -43,6 +44,11 @@ pub trait OfflineClientT<T: Config>: Clone + Send + Sync + 'static {
fn constants(&self) -> ConstantsClient<T, Self> {
ConstantsClient::new(self.clone())
}

/// Work with blocks.
fn blocks(&self) -> BlocksClient<T, Self> {
BlocksClient::new(self.clone())
}
}

/// A client that is capable of performing offline-only operations.
Expand Down
6 changes: 6 additions & 0 deletions subxt/src/client/online_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::{
OfflineClientT,
};
use crate::{
blocks::BlocksClient,
constants::ConstantsClient,
error::Error,
events::EventsClient,
Expand Down Expand Up @@ -203,6 +204,11 @@ impl<T: Config> OnlineClient<T> {
pub fn constants(&self) -> ConstantsClient<T, Self> {
<Self as OfflineClientT<T>>::constants(self)
}

/// Work with blocks.
pub fn blocks(&self) -> BlocksClient<T, Self> {
<Self as OfflineClientT<T>>::blocks(self)
}
}

impl<T: Config> OfflineClientT<T> for OnlineClient<T> {
Expand Down
3 changes: 1 addition & 2 deletions subxt/src/events/event_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
client::OnlineClientT,
error::Error,
events::EventsClient,
rpc::Subscription,
Config,
};
use derivative::Derivative;
Expand Down Expand Up @@ -40,7 +39,7 @@ pub type FinalizedEventSub<Header> = BoxStream<'static, Result<Header, Error>>;
/// A Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type EventSub<Item> = Subscription<Item>;
pub type EventSub<Item> = BoxStream<'static, Result<Item, Error>>;

/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block.
#[derive(Derivative)]
Expand Down
86 changes: 7 additions & 79 deletions subxt/src/events/events_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,10 @@ use crate::{
Config,
};
use derivative::Derivative;
use futures::{
future::Either,
stream,
Stream,
StreamExt,
};
use sp_core::{
storage::StorageKey,
twox_128,
};
use sp_runtime::traits::Header;
use std::future::Future;

/// A client for working with events.
Expand Down Expand Up @@ -96,7 +89,10 @@ where
) -> impl Future<
Output = Result<EventSubscription<T, Client, EventSub<T::Header>>, Error>,
> + Send
+ 'static {
+ 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
async move { subscribe(client).await }
}
Expand Down Expand Up @@ -157,8 +153,8 @@ where
T: Config,
Client: OnlineClientT<T>,
{
let block_subscription = client.rpc().subscribe_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
let block_subscription = client.blocks().subscribe_headers().await?;
Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}

/// Subscribe to events from finalized blocks.
Expand All @@ -169,78 +165,10 @@ where
T: Config,
Client: OnlineClientT<T>,
{
// fetch the last finalised block details immediately, so that we'll get
// events for each block after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_number = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());

let sub = client.rpc().subscribe_finalized_blocks().await?;

// Fill in any gaps between the block above and the finalized blocks reported.
let block_subscription = subscribe_to_block_headers_filling_in_gaps(
client.clone(),
last_finalized_block_number,
sub,
);

let block_subscription = client.blocks().subscribe_finalized_headers().await?;
Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}

/// Note: This is exposed for testing but is not considered stable and may change
/// without notice in a patch release.
#[doc(hidden)]
pub fn subscribe_to_block_headers_filling_in_gaps<T, Client, S, E>(
client: Client,
mut last_block_num: Option<u64>,
sub: S,
) -> impl Stream<Item = Result<T::Header, Error>> + Send
where
T: Config,
Client: OnlineClientT<T> + Send + Sync,
S: Stream<Item = Result<T::Header, E>> + Send,
E: Into<Error> + Send + 'static,
{
sub.flat_map(move |s| {
let client = client.clone();

// Get the header, or return a stream containing just the error. Our EventSubscription
// stream will return `None` as soon as it hits an error like this.
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};

// We want all previous details up to, but not including this current block num.
let end_block_num = (*header.number()).into();

// This is one after the last block we returned details for last time.
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);

// Iterate over all of the previous blocks we need headers for, ignoring the current block
// (which we already have the header info for):
let previous_headers = stream::iter(start_block_num..end_block_num)
.then(move |n| {
let client = client.clone();
async move {
let hash = client.rpc().block_hash(Some(n.into())).await?;
let header = client.rpc().header(hash).await?;
Ok::<_, Error>(header)
}
})
.filter_map(|h| async { h.transpose() });

// On the next iteration, we'll get details starting just after this end block.
last_block_num = Some(end_block_num);

// Return a combination of any previous headers plus the new header.
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
})
}

// The storage key needed to access events.
fn system_events_key() -> StorageKey {
let mut storage_key = twox_128(b"System").to_vec();
Expand Down
5 changes: 1 addition & 4 deletions subxt/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ pub use event_subscription::{
EventSubscription,
FinalizedEventSub,
};
pub use events_client::{
subscribe_to_block_headers_filling_in_gaps,
EventsClient,
};
pub use events_client::EventsClient;
pub use events_type::{
EventDetails,
Events,
Expand Down
3 changes: 2 additions & 1 deletion subxt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@

pub use subxt_macro::subxt;

pub mod blocks;
pub mod client;
pub mod config;
pub mod constants;
Expand All @@ -148,7 +149,7 @@ pub mod tx;
pub mod utils;

// Expose a few of the most common types at root,
// but leave most types behind their respoctive modules.
// but leave most types behind their respective modules.
pub use crate::{
client::{
OfflineClient,
Expand Down
Loading

0 comments on commit 95e6aa9

Please sign in to comment.