From cb954820a8d8d765ce75021e244223a3b4d5722d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= <123550+andresilva@users.noreply.github.com> Date: Tue, 18 Apr 2023 10:38:04 +0100 Subject: [PATCH] babe: replace usage of SharedEpochChanges with internal RPC (#13883) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * babe: replace usage of SharedEpochChanges with internal RPC * babe-rpc: fix tests * babe: use SinkExt::send instead of Sender::try_send SinkExt::send provides backpressure in case the channel is full * Update client/consensus/babe/src/lib.rs Co-authored-by: Bastian Köcher * babe: fix spawn * babe: send handles backpressure * babe: use testing::TaskExecutor * babe-rpc: better error handling --------- Co-authored-by: Bastian Köcher --- Cargo.lock | 1 - bin/node/cli/src/service.rs | 10 +- bin/node/rpc/Cargo.toml | 1 - bin/node/rpc/src/lib.rs | 24 ++--- client/consensus/babe/rpc/src/lib.rs | 135 +++++++++++++------------- client/consensus/babe/src/lib.rs | 140 ++++++++++++++------------- client/sync-state-rpc/src/lib.rs | 35 ++++--- 7 files changed, 171 insertions(+), 175 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb8081221f0e2..b7d3bd5ee41e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5142,7 +5142,6 @@ dependencies = [ "sc-client-api", "sc-consensus-babe", "sc-consensus-babe-rpc", - "sc-consensus-epochs", "sc-consensus-grandpa", "sc-consensus-grandpa-rpc", "sc-rpc", diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 7b0dfcda6ff0e..de640180574ce 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -203,7 +203,7 @@ pub fn new_partial( )?; let slot_duration = babe_link.config().slot_duration(); - let import_queue = sc_consensus_babe::import_queue( + let (import_queue, babe_worker_handle) = sc_consensus_babe::import_queue( babe_link.clone(), block_import.clone(), Some(Box::new(justification_import)), @@ -228,7 +228,7 @@ pub fn new_partial( let import_setup = (block_import, grandpa_link, babe_link); let (rpc_extensions_builder, rpc_setup) = { - let (_, grandpa_link, babe_link) = &import_setup; + let (_, grandpa_link, _) = &import_setup; let justification_stream = grandpa_link.justification_stream(); let shared_authority_set = grandpa_link.shared_authority_set().clone(); @@ -240,9 +240,6 @@ pub fn new_partial( Some(shared_authority_set.clone()), ); - let babe_config = babe_link.config().clone(); - let shared_epoch_changes = babe_link.epoch_changes().clone(); - let client = client.clone(); let pool = transaction_pool.clone(); let select_chain = select_chain.clone(); @@ -258,9 +255,8 @@ pub fn new_partial( chain_spec: chain_spec.cloned_box(), deny_unsafe, babe: node_rpc::BabeDeps { - babe_config: babe_config.clone(), - shared_epoch_changes: shared_epoch_changes.clone(), keystore: keystore.clone(), + babe_worker_handle: babe_worker_handle.clone(), }, grandpa: node_rpc::GrandpaDeps { shared_voter_state: shared_voter_state.clone(), diff --git a/bin/node/rpc/Cargo.toml b/bin/node/rpc/Cargo.toml index 0ea6f49bd6094..724efbe9a5721 100644 --- a/bin/node/rpc/Cargo.toml +++ b/bin/node/rpc/Cargo.toml @@ -21,7 +21,6 @@ sc-chain-spec = { version = "4.0.0-dev", path = "../../../client/chain-spec" } sc-client-api = { version = "4.0.0-dev", path = "../../../client/api" } sc-consensus-babe = { version = "0.10.0-dev", path = "../../../client/consensus/babe" } sc-consensus-babe-rpc = { version = "0.10.0-dev", path = "../../../client/consensus/babe/rpc" } -sc-consensus-epochs = { version = "0.10.0-dev", path = "../../../client/consensus/epochs" } sc-consensus-grandpa = { version = "0.10.0-dev", path = "../../../client/consensus/grandpa" } sc-consensus-grandpa-rpc = { version = "0.10.0-dev", path = "../../../client/consensus/grandpa/rpc" } sc-rpc = { version = "4.0.0-dev", path = "../../../client/rpc" } diff --git a/bin/node/rpc/src/lib.rs b/bin/node/rpc/src/lib.rs index 9dcdf0f218923..5f61fdcd55d97 100644 --- a/bin/node/rpc/src/lib.rs +++ b/bin/node/rpc/src/lib.rs @@ -36,8 +36,7 @@ use std::sync::Arc; use jsonrpsee::RpcModule; use node_primitives::{AccountId, Balance, Block, BlockNumber, Hash, Index}; use sc_client_api::AuxStore; -use sc_consensus_babe::{BabeConfiguration, Epoch}; -use sc_consensus_epochs::SharedEpochChanges; +use sc_consensus_babe::BabeWorkerHandle; use sc_consensus_grandpa::{ FinalityProofProvider, GrandpaJustificationStream, SharedAuthoritySet, SharedVoterState, }; @@ -53,10 +52,8 @@ use sp_keystore::KeystorePtr; /// Extra dependencies for BABE. pub struct BabeDeps { - /// BABE protocol config. - pub babe_config: BabeConfiguration, - /// BABE pending epoch changes. - pub shared_epoch_changes: SharedEpochChanges, + /// A handle to the BABE worker for issuing requests. + pub babe_worker_handle: BabeWorkerHandle, /// The keystore that manages the keys of the node. pub keystore: KeystorePtr, } @@ -130,7 +127,7 @@ where let mut io = RpcModule::new(()); let FullDeps { client, pool, select_chain, chain_spec, deny_unsafe, babe, grandpa } = deps; - let BabeDeps { keystore, babe_config, shared_epoch_changes } = babe; + let BabeDeps { keystore, babe_worker_handle } = babe; let GrandpaDeps { shared_voter_state, shared_authority_set, @@ -151,15 +148,8 @@ where io.merge(Mmr::new(client.clone()).into_rpc())?; io.merge(TransactionPayment::new(client.clone()).into_rpc())?; io.merge( - Babe::new( - client.clone(), - shared_epoch_changes.clone(), - keystore, - babe_config, - select_chain, - deny_unsafe, - ) - .into_rpc(), + Babe::new(client.clone(), babe_worker_handle.clone(), keystore, select_chain, deny_unsafe) + .into_rpc(), )?; io.merge( Grandpa::new( @@ -173,7 +163,7 @@ where )?; io.merge( - SyncState::new(chain_spec, client.clone(), shared_authority_set, shared_epoch_changes)? + SyncState::new(chain_spec, client.clone(), shared_authority_set, babe_worker_handle)? .into_rpc(), )?; diff --git a/client/consensus/babe/rpc/src/lib.rs b/client/consensus/babe/rpc/src/lib.rs index cdc8dbfd80a34..1ae15cc5453d7 100644 --- a/client/consensus/babe/rpc/src/lib.rs +++ b/client/consensus/babe/rpc/src/lib.rs @@ -18,28 +18,29 @@ //! RPC api for babe. +use std::{collections::HashMap, sync::Arc}; + use futures::TryFutureExt; use jsonrpsee::{ core::{async_trait, Error as JsonRpseeError, RpcResult}, proc_macros::rpc, types::{error::CallError, ErrorObject}, }; +use serde::{Deserialize, Serialize}; -use sc_consensus_babe::{authorship, Epoch}; -use sc_consensus_epochs::{descendent_query, Epoch as EpochT, SharedEpochChanges}; +use sc_consensus_babe::{authorship, BabeWorkerHandle}; +use sc_consensus_epochs::Epoch as EpochT; use sc_rpc_api::DenyUnsafe; -use serde::{Deserialize, Serialize}; use sp_api::ProvideRuntimeApi; use sp_application_crypto::AppCrypto; use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata}; use sp_consensus::{Error as ConsensusError, SelectChain}; -use sp_consensus_babe::{ - digests::PreDigest, AuthorityId, BabeApi as BabeRuntimeApi, BabeConfiguration, -}; +use sp_consensus_babe::{digests::PreDigest, AuthorityId, BabeApi as BabeRuntimeApi}; use sp_core::crypto::ByteArray; use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, Header as _}; -use std::{collections::HashMap, sync::Arc}; + +const BABE_ERROR: i32 = 9000; /// Provides rpc methods for interacting with Babe. #[rpc(client, server)] @@ -54,12 +55,10 @@ pub trait BabeApi { pub struct Babe { /// shared reference to the client. client: Arc, - /// shared reference to EpochChanges - shared_epoch_changes: SharedEpochChanges, + /// A handle to the BABE worker for issuing requests. + babe_worker_handle: BabeWorkerHandle, /// shared reference to the Keystore keystore: KeystorePtr, - /// config (actually holds the slot duration) - babe_config: BabeConfiguration, /// The SelectChain strategy select_chain: SC, /// Whether to deny unsafe calls @@ -70,13 +69,12 @@ impl Babe { /// Creates a new instance of the Babe Rpc handler. pub fn new( client: Arc, - shared_epoch_changes: SharedEpochChanges, + babe_worker_handle: BabeWorkerHandle, keystore: KeystorePtr, - babe_config: BabeConfiguration, select_chain: SC, deny_unsafe: DenyUnsafe, ) -> Self { - Self { client, shared_epoch_changes, keystore, babe_config, select_chain, deny_unsafe } + Self { client, babe_worker_handle, keystore, select_chain, deny_unsafe } } } @@ -93,21 +91,21 @@ where { async fn epoch_authorship(&self) -> RpcResult> { self.deny_unsafe.check_if_safe()?; - let header = self.select_chain.best_chain().map_err(Error::Consensus).await?; + + let best_header = self.select_chain.best_chain().map_err(Error::SelectChain).await?; + let epoch_start = self .client .runtime_api() - .current_epoch_start(header.hash()) - .map_err(|err| Error::StringError(format!("{:?}", err)))?; - - let epoch = epoch_data( - &self.shared_epoch_changes, - &self.client, - &self.babe_config, - *epoch_start, - &self.select_chain, - ) - .await?; + .current_epoch_start(best_header.hash()) + .map_err(|_| Error::FetchEpoch)?; + + let epoch = self + .babe_worker_handle + .epoch_data_for_child_of(best_header.hash(), *best_header.number(), epoch_start) + .await + .map_err(|_| Error::FetchEpoch)?; + let (epoch_start, epoch_end) = (epoch.start_slot(), epoch.end_slot()); let mut claims: HashMap = HashMap::new(); @@ -159,59 +157,37 @@ pub struct EpochAuthorship { secondary_vrf: Vec, } -/// Errors encountered by the RPC +/// Top-level error type for the RPC handler. #[derive(Debug, thiserror::Error)] pub enum Error { - /// Consensus error - #[error(transparent)] - Consensus(#[from] ConsensusError), - /// Errors that can be formatted as a String - #[error("{0}")] - StringError(String), + /// Failed to fetch the current best header. + #[error("Failed to fetch the current best header: {0}")] + SelectChain(ConsensusError), + /// Failed to fetch epoch data. + #[error("Failed to fetch epoch data")] + FetchEpoch, } impl From for JsonRpseeError { fn from(error: Error) -> Self { + let error_code = match error { + Error::SelectChain(_) => 1, + Error::FetchEpoch => 2, + }; + JsonRpseeError::Call(CallError::Custom(ErrorObject::owned( - 1234, + BABE_ERROR + error_code, error.to_string(), - None::<()>, + Some(format!("{:?}", error)), ))) } } -/// Fetches the epoch data for a given slot. -async fn epoch_data( - epoch_changes: &SharedEpochChanges, - client: &Arc, - babe_config: &BabeConfiguration, - slot: u64, - select_chain: &SC, -) -> Result -where - B: BlockT, - C: HeaderBackend + HeaderMetadata + 'static, - SC: SelectChain, -{ - let parent = select_chain.best_chain().await?; - epoch_changes - .shared_data() - .epoch_data_for_child_of( - descendent_query(&**client), - &parent.hash(), - *parent.number(), - slot.into(), - |slot| Epoch::genesis(babe_config, slot), - ) - .map_err(|e| Error::Consensus(ConsensusError::ChainLookup(e.to_string())))? - .ok_or(Error::Consensus(ConsensusError::InvalidAuthoritiesSet)) -} - #[cfg(test)] mod tests { use super::*; - use sc_consensus_babe::block_import; - use sp_core::crypto::key_types::BABE; + use sp_consensus_babe::inherents::InherentDataProvider; + use sp_core::{crypto::key_types::BABE, testing::TaskExecutor}; use sp_keyring::Sr25519Keyring; use sp_keystore::{testing::MemoryKeystore, Keystore}; use substrate_test_runtime_client::{ @@ -233,14 +209,35 @@ mod tests { let builder = TestClientBuilder::new(); let (client, longest_chain) = builder.build_with_longest_chain(); let client = Arc::new(client); + let task_executor = TaskExecutor::new(); + let keystore = create_keystore(Sr25519Keyring::Alice); + let config = sc_consensus_babe::configuration(&*client).expect("config available"); - let (_, link) = block_import(config.clone(), client.clone(), client.clone()) - .expect("can initialize block-import"); + let slot_duration = config.slot_duration(); - let epoch_changes = link.epoch_changes().clone(); - let keystore = create_keystore(Sr25519Keyring::Alice); + let (block_import, link) = + sc_consensus_babe::block_import(config.clone(), client.clone(), client.clone()) + .expect("can initialize block-import"); + + let (_, babe_worker_handle) = sc_consensus_babe::import_queue( + link.clone(), + block_import.clone(), + None, + client.clone(), + longest_chain.clone(), + move |_, _| async move { + Ok((InherentDataProvider::from_timestamp_and_slot_duration( + 0.into(), + slot_duration, + ),)) + }, + &task_executor, + None, + None, + ) + .unwrap(); - Babe::new(client.clone(), epoch_changes, keystore, config, longest_chain, deny_unsafe) + Babe::new(client.clone(), babe_worker_handle, keystore, longest_chain, deny_unsafe) } #[tokio::test] diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index e540cae1f7e76..6327c8c657bb6 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -89,8 +89,8 @@ use prometheus_endpoint::Registry; use schnorrkel::SignatureError; use sc_client_api::{ - backend::AuxStore, AuxDataOperations, Backend as BackendT, BlockchainEvents, - FinalityNotification, PreCommitActions, ProvideUncles, UsageProvider, + backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification, + PreCommitActions, UsageProvider, }; use sc_consensus::{ block_import::{ @@ -338,6 +338,9 @@ pub enum Error { /// Create inherents error. #[error("Creating inherents failed: {0}")] CreateInherents(sp_inherents::Error), + /// Background worker is not running and therefore requests cannot be answered. + #[error("Background worker is not running")] + BackgroundWorkerTerminated, /// Client error #[error(transparent)] Client(sp_blockchain::Error), @@ -475,9 +478,6 @@ pub fn start_babe( where B: BlockT, C: ProvideRuntimeApi - + ProvideUncles - + BlockchainEvents - + PreCommitActions + HeaderBackend + HeaderMetadata + Send @@ -498,8 +498,6 @@ where BS: BackoffAuthoringBlocksStrategy> + Send + Sync + 'static, Error: std::error::Error + Send + From + From + 'static, { - const HANDLE_BUFFER_SIZE: usize = 1024; - let slot_notification_sinks = Arc::new(Mutex::new(Vec::new())); let worker = BabeSlotWorker { @@ -529,17 +527,7 @@ where create_inherent_data_providers, ); - let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE); - - let answer_requests = - answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes); - - let inner = future::select(Box::pin(slot_worker), Box::pin(answer_requests)); - Ok(BabeWorker { - inner: Box::pin(inner.map(|_| ())), - slot_notification_sinks, - handle: BabeWorkerHandle(worker_tx), - }) + Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks }) } // Remove obsolete block's weight data by leveraging finality notifications. @@ -593,42 +581,26 @@ async fn answer_requests( client: Arc, epoch_changes: SharedEpochChanges, ) where - C: ProvideRuntimeApi - + ProvideUncles - + BlockchainEvents - + HeaderBackend - + HeaderMetadata - + Send - + Sync - + 'static, + C: HeaderBackend + HeaderMetadata, { while let Some(request) = request_rx.next().await { match request { - BabeRequest::EpochForChild(parent_hash, parent_number, slot_number, response) => { + BabeRequest::EpochData(response) => { + let _ = response.send(epoch_changes.shared_data().clone()); + }, + BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => { let lookup = || { let epoch_changes = epoch_changes.shared_data(); - let epoch_descriptor = epoch_changes - .epoch_descriptor_for_child_of( + epoch_changes + .epoch_data_for_child_of( descendent_query(&*client), &parent_hash, parent_number, - slot_number, + slot, + |slot| Epoch::genesis(&config, slot), ) .map_err(|e| Error::::ForkTree(Box::new(e)))? - .ok_or(Error::::FetchEpoch(parent_hash))?; - - let viable_epoch = epoch_changes - .viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot)) - .ok_or(Error::::FetchEpoch(parent_hash))?; - - Ok(sp_consensus_babe::Epoch { - epoch_index: viable_epoch.as_ref().epoch_index, - start_slot: viable_epoch.as_ref().start_slot, - duration: viable_epoch.as_ref().duration, - authorities: viable_epoch.as_ref().authorities.clone(), - randomness: viable_epoch.as_ref().randomness, - config: viable_epoch.as_ref().config.clone(), - }) + .ok_or(Error::::FetchEpoch(parent_hash)) }; let _ = response.send(lookup()); @@ -638,17 +610,13 @@ async fn answer_requests( } /// Requests to the BABE service. -#[non_exhaustive] -pub enum BabeRequest { +enum BabeRequest { + /// Request all available epoch data. + EpochData(oneshot::Sender>), /// Request the epoch that a child of the given block, with the given slot number would have. /// /// The parent block is identified by its hash and number. - EpochForChild( - B::Hash, - NumberFor, - Slot, - oneshot::Sender>>, - ), + EpochDataForChildOf(B::Hash, NumberFor, Slot, oneshot::Sender>>), } /// A handle to the BABE worker for issuing requests. @@ -656,11 +624,41 @@ pub enum BabeRequest { pub struct BabeWorkerHandle(Sender>); impl BabeWorkerHandle { - /// Send a request to the BABE service. - pub async fn send(&mut self, request: BabeRequest) { - // Failure to send means that the service is down. - // This will manifest as the receiver of the request being dropped. - let _ = self.0.send(request).await; + async fn send_request(&self, request: BabeRequest) -> Result<(), Error> { + match self.0.clone().send(request).await { + Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated), + Err(err) => warn!( + target: LOG_TARGET, + "Unhandled error when sending request to worker: {:?}", err + ), + _ => {}, + } + + Ok(()) + } + + /// Fetch all available epoch data. + pub async fn epoch_data(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + self.send_request(BabeRequest::EpochData(tx)).await?; + + rx.await.or(Err(Error::BackgroundWorkerTerminated)) + } + + /// Fetch the epoch that a child of the given block, with the given slot number would have. + /// + /// The parent block is identified by its hash and number. + pub async fn epoch_data_for_child_of( + &self, + parent_hash: B::Hash, + parent_number: NumberFor, + slot: Slot, + ) -> Result> { + let (tx, rx) = oneshot::channel(); + self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx)) + .await?; + + rx.await.or(Err(Error::BackgroundWorkerTerminated))? } } @@ -669,7 +667,6 @@ impl BabeWorkerHandle { pub struct BabeWorker { inner: Pin + Send + 'static>>, slot_notification_sinks: SlotNotificationSinks, - handle: BabeWorkerHandle, } impl BabeWorker { @@ -684,11 +681,6 @@ impl BabeWorker { self.slot_notification_sinks.lock().push(sink); stream } - - /// Get a handle to the worker. - pub fn handle(&self) -> BabeWorkerHandle { - self.handle.clone() - } } impl Future for BabeWorker { @@ -1790,7 +1782,7 @@ pub fn import_queue( spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, telemetry: Option, -) -> ClientResult> +) -> ClientResult<(DefaultImportQueue, BabeWorkerHandle)> where Inner: BlockImport< Block, @@ -1811,16 +1803,28 @@ where CIDP: CreateInherentDataProviders + Send + Sync + 'static, CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync, { + const HANDLE_BUFFER_SIZE: usize = 1024; + let verifier = BabeVerifier { select_chain, create_inherent_data_providers, - config: babe_link.config, - epoch_changes: babe_link.epoch_changes, + config: babe_link.config.clone(), + epoch_changes: babe_link.epoch_changes.clone(), telemetry, - client, + client: client.clone(), }; - Ok(BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry)) + let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE); + + let answer_requests = + answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes); + + spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed()); + + Ok(( + BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry), + BabeWorkerHandle(worker_tx), + )) } /// Reverts protocol aux data to at most the last finalized block. diff --git a/client/sync-state-rpc/src/lib.rs b/client/sync-state-rpc/src/lib.rs index 78d5cafa31e26..dda8a7edfa9bb 100644 --- a/client/sync-state-rpc/src/lib.rs +++ b/client/sync-state-rpc/src/lib.rs @@ -41,20 +41,21 @@ #![deny(unused_crate_dependencies)] +use std::sync::Arc; + use jsonrpsee::{ - core::{Error as JsonRpseeError, RpcResult}, + core::{async_trait, Error as JsonRpseeError, RpcResult}, proc_macros::rpc, types::{error::CallError, ErrorObject}, }; + use sc_client_api::StorageData; +use sc_consensus_babe::{BabeWorkerHandle, Error as BabeError}; use sp_blockchain::HeaderBackend; use sp_runtime::traits::{Block as BlockT, NumberFor}; -use std::sync::Arc; type SharedAuthoritySet = sc_consensus_grandpa::SharedAuthoritySet<::Hash, NumberFor>; -type SharedEpochChanges = - sc_consensus_epochs::SharedEpochChanges; /// Error type used by this crate. #[derive(Debug, thiserror::Error)] @@ -66,6 +67,9 @@ pub enum Error { #[error("Failed to load the block weight for block {0:?}")] LoadingBlockWeightFailed(Block::Hash), + #[error("Failed to load the BABE epoch data: {0}")] + LoadingEpochDataFailed(BabeError), + #[error("JsonRpc error: {0}")] JsonRpc(String), @@ -125,7 +129,7 @@ pub struct LightSyncState { pub trait SyncStateApi { /// Returns the JSON serialized chainspec running the node, with a sync state. #[method(name = "sync_state_genSyncSpec")] - fn system_gen_sync_spec(&self, raw: bool) -> RpcResult; + async fn system_gen_sync_spec(&self, raw: bool) -> RpcResult; } /// An api for sync state RPC calls. @@ -133,7 +137,7 @@ pub struct SyncState { chain_spec: Box, client: Arc, shared_authority_set: SharedAuthoritySet, - shared_epoch_changes: SharedEpochChanges, + babe_worker_handle: BabeWorkerHandle, } impl SyncState @@ -146,18 +150,24 @@ where chain_spec: Box, client: Arc, shared_authority_set: SharedAuthoritySet, - shared_epoch_changes: SharedEpochChanges, + babe_worker_handle: BabeWorkerHandle, ) -> Result> { if sc_chain_spec::get_extension::(chain_spec.extensions()) .is_some() { - Ok(Self { chain_spec, client, shared_authority_set, shared_epoch_changes }) + Ok(Self { chain_spec, client, shared_authority_set, babe_worker_handle }) } else { Err(Error::::LightSyncStateExtensionNotFound) } } - fn build_sync_state(&self) -> Result, Error> { + async fn build_sync_state(&self) -> Result, Error> { + let epoch_changes = self + .babe_worker_handle + .epoch_data() + .await + .map_err(Error::LoadingEpochDataFailed)?; + let finalized_hash = self.client.info().finalized_hash; let finalized_header = self .client @@ -170,20 +180,21 @@ where Ok(LightSyncState { finalized_block_header: finalized_header, - babe_epoch_changes: self.shared_epoch_changes.shared_data().clone(), + babe_epoch_changes: epoch_changes, babe_finalized_block_weight: finalized_block_weight, grandpa_authority_set: self.shared_authority_set.clone_inner(), }) } } +#[async_trait] impl SyncStateApiServer for SyncState where Block: BlockT, Backend: HeaderBackend + sc_client_api::AuxStore + 'static, { - fn system_gen_sync_spec(&self, raw: bool) -> RpcResult { - let current_sync_state = self.build_sync_state()?; + async fn system_gen_sync_spec(&self, raw: bool) -> RpcResult { + let current_sync_state = self.build_sync_state().await?; let mut chain_spec = self.chain_spec.cloned_box(); let extension = sc_chain_spec::get_extension_mut::(