Skip to content

Commit

Permalink
Get best and finalized head updates from subscription (#2166)
Browse files Browse the repository at this point in the history
* read best + best finalized headers from subscription

* spelling

* clippy
  • Loading branch information
svyatonik authored and bkontur committed May 22, 2024
1 parent 12d51c1 commit 12c59b5
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 13 deletions.
158 changes: 146 additions & 12 deletions bridges/relays/client-substrate/src/client/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,23 @@ use crate::{
HashOf, HeaderIdOf, HeaderOf, NonceOf, SignedBlockOf, SimpleRuntimeVersion, Subscription,
TransactionTracker, UnsignedTransaction, ANCIENT_BLOCK_THRESHOLD,
};
use std::future::Future;
use std::{cmp::Ordering, future::Future, task::Poll};

use async_std::sync::{Arc, Mutex, RwLock};
use async_std::{
sync::{Arc, Mutex, RwLock},
task::JoinHandle,
};
use async_trait::async_trait;
use codec::Encode;
use frame_support::weights::Weight;
use futures::{FutureExt, StreamExt};
use quick_cache::unsync::Cache;
use sp_consensus_grandpa::{AuthorityId, OpaqueKeyOwnershipProof, SetId};
use sp_core::{
storage::{StorageData, StorageKey},
Bytes, Pair,
};
use sp_runtime::transaction_validity::TransactionValidity;
use sp_runtime::{traits::Header as _, transaction_validity::TransactionValidity};
use sp_trie::StorageProof;
use sp_version::RuntimeVersion;

Expand All @@ -57,6 +61,9 @@ pub struct CachingClient<C: Chain, B: Client<C>> {
struct ClientData<C: Chain> {
grandpa_justifications: Arc<Mutex<Option<SubscriptionBroadcaster<Bytes>>>>,
beefy_justifications: Arc<Mutex<Option<SubscriptionBroadcaster<Bytes>>>>,
background_task_handle: Arc<Mutex<JoinHandle<Result<()>>>>,
best_header: Arc<RwLock<Option<HeaderOf<C>>>>,
best_finalized_header: Arc<RwLock<Option<HeaderOf<C>>>>,
// `quick_cache::sync::Cache` has the `get_or_insert_async` method, which fits our needs,
// but it uses synchronization primitives that are not aware of async execution. They
// can block the executor threads and cause deadlocks => let's use primitives from
Expand All @@ -70,19 +77,32 @@ struct ClientData<C: Chain> {

impl<C: Chain, B: Client<C>> CachingClient<C, B> {
/// Creates new `CachingClient` on top of given `backend`.
pub fn new(backend: B) -> Self {
pub async fn new(backend: B) -> Self {
// most of relayer operations will never touch more than `ANCIENT_BLOCK_THRESHOLD`
// headers, so we'll use this as a cache capacity for all chain-related caches
let chain_state_capacity = ANCIENT_BLOCK_THRESHOLD as usize;
let best_header = Arc::new(RwLock::new(None));
let best_finalized_header = Arc::new(RwLock::new(None));
let header_by_hash_cache = Arc::new(RwLock::new(Cache::new(chain_state_capacity)));
let background_task_handle = Self::start_background_task(
backend.clone(),
best_header.clone(),
best_finalized_header.clone(),
header_by_hash_cache.clone(),
)
.await;
CachingClient {
backend,
data: Arc::new(ClientData {
grandpa_justifications: Arc::new(Mutex::new(None)),
beefy_justifications: Arc::new(Mutex::new(None)),
background_task_handle: Arc::new(Mutex::new(background_task_handle)),
best_header,
best_finalized_header,
header_hash_by_number_cache: Arc::new(RwLock::new(Cache::new(
chain_state_capacity,
))),
header_by_hash_cache: Arc::new(RwLock::new(Cache::new(chain_state_capacity))),
header_by_hash_cache,
block_by_hash_cache: Arc::new(RwLock::new(Cache::new(chain_state_capacity))),
raw_storage_value_cache: Arc::new(RwLock::new(Cache::new(1_024))),
state_call_cache: Arc::new(RwLock::new(Cache::new(1_024))),
Expand Down Expand Up @@ -114,6 +134,7 @@ impl<C: Chain, B: Client<C>> CachingClient<C, B> {
Ok(value)
}

/// Subscribe to finality justifications, trying to reuse existing subscription.
async fn subscribe_finality_justifications<'a>(
&'a self,
maybe_broadcaster: &Mutex<Option<SubscriptionBroadcaster<Bytes>>>,
Expand All @@ -133,6 +154,98 @@ impl<C: Chain, B: Client<C>> CachingClient<C, B> {

broadcaster.subscribe().await
}

/// Start background task that reads best (and best finalized) headers from subscriptions.
async fn start_background_task(
backend: B,
best_header: Arc<RwLock<Option<HeaderOf<C>>>>,
best_finalized_header: Arc<RwLock<Option<HeaderOf<C>>>>,
header_by_hash_cache: SyncCache<HashOf<C>, HeaderOf<C>>,
) -> JoinHandle<Result<()>> {
async_std::task::spawn(async move {
// initialize by reading headers directly from backend to avoid doing that in the
// high-level code
let mut last_finalized_header =
backend.header_by_hash(backend.best_finalized_header_hash().await?).await?;
*best_header.write().await = Some(backend.best_header().await?);
*best_finalized_header.write().await = Some(last_finalized_header.clone());

// ...and then continue with subscriptions
let mut best_headers = backend.subscribe_best_headers().await?;
let mut finalized_headers = backend.subscribe_finalized_headers().await?;
loop {
futures::select! {
new_best_header = best_headers.next().fuse() => {
// we assume that the best header is always the actual best header, even if its
// number is lower than the number of previous-best-header (chain may use its own
// best header selection algorithms)
let new_best_header = new_best_header
.ok_or_else(|| Error::ChannelError(format!("Mandatory best headers subscription for {} has finished", C::NAME)))?;
let new_best_header_hash = new_best_header.hash();
header_by_hash_cache.write().await.insert(new_best_header_hash, new_best_header.clone());
*best_header.write().await = Some(new_best_header);
},
new_finalized_header = finalized_headers.next().fuse() => {
// in theory we'll always get finalized headers in order, but let's double check
let new_finalized_header = new_finalized_header.
ok_or_else(|| Error::ChannelError(format!("Finalized headers subscription for {} has finished", C::NAME)))?;
let new_finalized_header_number = *new_finalized_header.number();
let last_finalized_header_number = *last_finalized_header.number();
match new_finalized_header_number.cmp(&last_finalized_header_number) {
Ordering::Greater => {
let new_finalized_header_hash = new_finalized_header.hash();
header_by_hash_cache.write().await.insert(new_finalized_header_hash, new_finalized_header.clone());
*best_finalized_header.write().await = Some(new_finalized_header.clone());
last_finalized_header = new_finalized_header;
},
Ordering::Less => {
return Err(Error::unordered_finalized_headers::<C>(
new_finalized_header_number,
last_finalized_header_number,
));
},
_ => (),
}
},
}
}
})
}

/// Ensure that the background task is active.
async fn ensure_background_task_active(&self) -> Result<()> {
let mut background_task_handle = self.data.background_task_handle.lock().await;
if let Poll::Ready(result) = futures::poll!(&mut *background_task_handle) {
return Err(Error::ChannelError(format!(
"Background task of {} client has exited with result: {:?}",
C::NAME,
result
)))
}

Ok(())
}

/// Try to get header, read elsewhere by background task through subscription.
async fn read_header_from_background<'a>(
&'a self,
header: &Arc<RwLock<Option<HeaderOf<C>>>>,
read_header_from_backend: impl Future<Output = Result<HeaderOf<C>>> + 'a,
) -> Result<HeaderOf<C>> {
// ensure that the background task is active
self.ensure_background_task_active().await?;

// now we know that the background task is active, so we could trust that the
// `header` has the most recent updates from it
match header.read().await.clone() {
Some(header) => Ok(header),
None => {
// header has not yet been read from the subscription, which means that
// we are just starting - let's read header directly from backend this time
read_header_from_backend.await
},
}
}
}

impl<C: Chain, B: Client<C>> std::fmt::Debug for CachingClient<C, B> {
Expand Down Expand Up @@ -162,6 +275,16 @@ impl<C: Chain, B: Client<C>> Client<C> for CachingClient<C, B> {
// since we have new underlying client, we need to restart subscriptions too
*self.data.grandpa_justifications.lock().await = None;
*self.data.beefy_justifications.lock().await = None;
// also restart background task too
*self.data.best_header.write().await = None;
*self.data.best_finalized_header.write().await = None;
*self.data.background_task_handle.lock().await = Self::start_background_task(
self.backend.clone(),
self.data.best_header.clone(),
self.data.best_finalized_header.clone(),
self.data.header_by_hash_cache.clone(),
)
.await;
Ok(())
}

Expand Down Expand Up @@ -197,16 +320,27 @@ impl<C: Chain, B: Client<C>> Client<C> for CachingClient<C, B> {
}

async fn best_finalized_header_hash(&self) -> Result<HashOf<C>> {
// TODO: after https://github.com/paritytech/parity-bridges-common/issues/2074 we may
// use single-value-cache here, but for now let's just call the backend
self.backend.best_finalized_header_hash().await
self.read_header_from_background(
&self.data.best_finalized_header,
self.backend.best_finalized_header(),
)
.await
.map(|h| h.hash())
}

async fn best_header(&self) -> Result<HeaderOf<C>> {
// TODO: if after https://github.com/paritytech/parity-bridges-common/issues/2074 we'll
// be using subscriptions to get best blocks, we may use single-value-cache here, but for
// now let's just call the backend
self.backend.best_header().await
self.read_header_from_background(&self.data.best_header, self.backend.best_header())
.await
}

async fn subscribe_best_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
// we may share the sunbscription here, but atm there's no callers of this method
self.backend.subscribe_best_headers().await
}

async fn subscribe_finalized_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
// we may share the sunbscription here, but atm there's no callers of this method
self.backend.subscribe_finalized_headers().await
}

async fn subscribe_grandpa_finality_justifications(&self) -> Result<Subscription<Bytes>>
Expand Down
5 changes: 5 additions & 0 deletions bridges/relays/client-substrate/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ pub trait Client<C: Chain>: 'static + Send + Sync + Clone + Debug {
Ok(self.best_header().await?.hash())
}

/// Subscribe to new best headers.
async fn subscribe_best_headers(&self) -> Result<Subscription<HeaderOf<C>>>;
/// Subscribe to new finalized headers.
async fn subscribe_finalized_headers(&self) -> Result<Subscription<HeaderOf<C>>>;

/// Subscribe to GRANDPA finality justifications.
async fn subscribe_grandpa_finality_justifications(&self) -> Result<Subscription<Bytes>>
where
Expand Down
2 changes: 1 addition & 1 deletion bridges/relays/client-substrate/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub type RpcWithCachingClient<C> = CachingClient<C, RpcClient<C>>;
/// Creates new RPC client with caching support.
pub async fn rpc_with_caching<C: Chain>(params: ConnectionParams) -> RpcWithCachingClient<C> {
let rpc = rpc::RpcClient::<C>::new(params).await;
caching::CachingClient::new(rpc)
caching::CachingClient::new(rpc).await
}

/// The difference between best block number and number of its ancestor, that is enough
Expand Down
42 changes: 42 additions & 0 deletions bridges/relays/client-substrate/src/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ impl<C: Chain> RpcClient<C> {
.await
}

/// Subscribe to finality justifications.
async fn subscribe_finality_justifications<Fut>(
&self,
gadget_name: &str,
Expand All @@ -272,6 +273,27 @@ impl<C: Chain> RpcClient<C> {
subscription,
))
}

/// Subscribe to headers stream.
async fn subscribe_headers<Fut>(
&self,
stream_name: &str,
do_subscribe: impl FnOnce(Arc<WsClient>) -> Fut + Send + 'static,
map_err: impl FnOnce(Error) -> Error,
) -> Result<Subscription<HeaderOf<C>>>
where
Fut: Future<Output = std::result::Result<RpcSubscription<HeaderOf<C>>, ClientError>> + Send,
{
let subscription = self
.jsonrpsee_execute(move |client| async move { Ok(do_subscribe(client).await?) })
.map_err(map_err)
.await?;

Ok(Subscription::new_forwarded(
StreamDescription::new(format!("{} headers", stream_name), C::NAME.into()),
subscription,
))
}
}

impl<C: Chain> Clone for RpcClient<C> {
Expand Down Expand Up @@ -356,6 +378,26 @@ impl<C: Chain> Client<C> for RpcClient<C> {
.map_err(|e| Error::failed_to_read_best_header::<C>(e))
}

async fn subscribe_best_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
self.subscribe_headers(
"best headers",
move |client| async move { SubstrateChainClient::<C>::subscribe_new_heads(&*client).await },
|e| Error::failed_to_subscribe_best_headers::<C>(e),
)
.await
}

async fn subscribe_finalized_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
self.subscribe_headers(
"best finalized headers",
move |client| async move {
SubstrateChainClient::<C>::subscribe_finalized_heads(&*client).await
},
|e| Error::failed_to_subscribe_finalized_headers::<C>(e),
)
.await
}

async fn subscribe_grandpa_finality_justifications(&self) -> Result<Subscription<Bytes>>
where
C: ChainWithGrandpa,
Expand Down
14 changes: 14 additions & 0 deletions bridges/relays/client-substrate/src/client/rpc_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ pub(crate) trait SubstrateChain<C> {
/// Return signed block (with justifications) by its hash.
#[method(name = "getBlock")]
async fn block(&self, block_hash: Option<C::Hash>) -> RpcResult<C::SignedBlock>;
/// Subscribe to best headers.
#[subscription(
name = "subscribeNewHeads" => "newHead",
unsubscribe = "unsubscribeNewHeads",
item = C::Header
)]
async fn subscribe_new_heads(&self);
/// Subscribe to finalized headers.
#[subscription(
name = "subscribeFinalizedHeads" => "finalizedHead",
unsubscribe = "unsubscribeFinalizedHeads",
item = C::Header
)]
async fn subscribe_finalized_heads(&self);
}

/// RPC methods of Substrate `author` namespace, that we are using.
Expand Down
Loading

0 comments on commit 12c59b5

Please sign in to comment.