Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
babe: replace usage of SharedEpochChanges with internal RPC (#13883)
Browse files Browse the repository at this point in the history
* babe: replace usage of SharedEpochChanges with internal RPC

* babe-rpc: fix tests

* babe: use SinkExt::send instead of Sender::try_send

SinkExt::send provides backpressure in case the channel is full

* Update client/consensus/babe/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

* babe: fix spawn

* babe: send handles backpressure

* babe: use testing::TaskExecutor

* babe-rpc: better error handling

---------

Co-authored-by: Bastian Köcher <git@kchr.de>
  • Loading branch information
andresilva and bkchr authored Apr 18, 2023
1 parent af29c6f commit cb95482
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 175 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub fn new_partial(
)?;

let slot_duration = babe_link.config().slot_duration();
let import_queue = sc_consensus_babe::import_queue(
let (import_queue, babe_worker_handle) = sc_consensus_babe::import_queue(
babe_link.clone(),
block_import.clone(),
Some(Box::new(justification_import)),
Expand All @@ -228,7 +228,7 @@ pub fn new_partial(
let import_setup = (block_import, grandpa_link, babe_link);

let (rpc_extensions_builder, rpc_setup) = {
let (_, grandpa_link, babe_link) = &import_setup;
let (_, grandpa_link, _) = &import_setup;

let justification_stream = grandpa_link.justification_stream();
let shared_authority_set = grandpa_link.shared_authority_set().clone();
Expand All @@ -240,9 +240,6 @@ pub fn new_partial(
Some(shared_authority_set.clone()),
);

let babe_config = babe_link.config().clone();
let shared_epoch_changes = babe_link.epoch_changes().clone();

let client = client.clone();
let pool = transaction_pool.clone();
let select_chain = select_chain.clone();
Expand All @@ -258,9 +255,8 @@ pub fn new_partial(
chain_spec: chain_spec.cloned_box(),
deny_unsafe,
babe: node_rpc::BabeDeps {
babe_config: babe_config.clone(),
shared_epoch_changes: shared_epoch_changes.clone(),
keystore: keystore.clone(),
babe_worker_handle: babe_worker_handle.clone(),
},
grandpa: node_rpc::GrandpaDeps {
shared_voter_state: shared_voter_state.clone(),
Expand Down
1 change: 0 additions & 1 deletion bin/node/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ sc-chain-spec = { version = "4.0.0-dev", path = "../../../client/chain-spec" }
sc-client-api = { version = "4.0.0-dev", path = "../../../client/api" }
sc-consensus-babe = { version = "0.10.0-dev", path = "../../../client/consensus/babe" }
sc-consensus-babe-rpc = { version = "0.10.0-dev", path = "../../../client/consensus/babe/rpc" }
sc-consensus-epochs = { version = "0.10.0-dev", path = "../../../client/consensus/epochs" }
sc-consensus-grandpa = { version = "0.10.0-dev", path = "../../../client/consensus/grandpa" }
sc-consensus-grandpa-rpc = { version = "0.10.0-dev", path = "../../../client/consensus/grandpa/rpc" }
sc-rpc = { version = "4.0.0-dev", path = "../../../client/rpc" }
Expand Down
24 changes: 7 additions & 17 deletions bin/node/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use std::sync::Arc;
use jsonrpsee::RpcModule;
use node_primitives::{AccountId, Balance, Block, BlockNumber, Hash, Index};
use sc_client_api::AuxStore;
use sc_consensus_babe::{BabeConfiguration, Epoch};
use sc_consensus_epochs::SharedEpochChanges;
use sc_consensus_babe::BabeWorkerHandle;
use sc_consensus_grandpa::{
FinalityProofProvider, GrandpaJustificationStream, SharedAuthoritySet, SharedVoterState,
};
Expand All @@ -53,10 +52,8 @@ use sp_keystore::KeystorePtr;

/// Extra dependencies for BABE.
pub struct BabeDeps {
/// BABE protocol config.
pub babe_config: BabeConfiguration,
/// BABE pending epoch changes.
pub shared_epoch_changes: SharedEpochChanges<Block, Epoch>,
/// A handle to the BABE worker for issuing requests.
pub babe_worker_handle: BabeWorkerHandle<Block>,
/// The keystore that manages the keys of the node.
pub keystore: KeystorePtr,
}
Expand Down Expand Up @@ -130,7 +127,7 @@ where
let mut io = RpcModule::new(());
let FullDeps { client, pool, select_chain, chain_spec, deny_unsafe, babe, grandpa } = deps;

let BabeDeps { keystore, babe_config, shared_epoch_changes } = babe;
let BabeDeps { keystore, babe_worker_handle } = babe;
let GrandpaDeps {
shared_voter_state,
shared_authority_set,
Expand All @@ -151,15 +148,8 @@ where
io.merge(Mmr::new(client.clone()).into_rpc())?;
io.merge(TransactionPayment::new(client.clone()).into_rpc())?;
io.merge(
Babe::new(
client.clone(),
shared_epoch_changes.clone(),
keystore,
babe_config,
select_chain,
deny_unsafe,
)
.into_rpc(),
Babe::new(client.clone(), babe_worker_handle.clone(), keystore, select_chain, deny_unsafe)
.into_rpc(),
)?;
io.merge(
Grandpa::new(
Expand All @@ -173,7 +163,7 @@ where
)?;

io.merge(
SyncState::new(chain_spec, client.clone(), shared_authority_set, shared_epoch_changes)?
SyncState::new(chain_spec, client.clone(), shared_authority_set, babe_worker_handle)?
.into_rpc(),
)?;

Expand Down
135 changes: 66 additions & 69 deletions client/consensus/babe/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,29 @@

//! RPC api for babe.
use std::{collections::HashMap, sync::Arc};

use futures::TryFutureExt;
use jsonrpsee::{
core::{async_trait, Error as JsonRpseeError, RpcResult},
proc_macros::rpc,
types::{error::CallError, ErrorObject},
};
use serde::{Deserialize, Serialize};

use sc_consensus_babe::{authorship, Epoch};
use sc_consensus_epochs::{descendent_query, Epoch as EpochT, SharedEpochChanges};
use sc_consensus_babe::{authorship, BabeWorkerHandle};
use sc_consensus_epochs::Epoch as EpochT;
use sc_rpc_api::DenyUnsafe;
use serde::{Deserialize, Serialize};
use sp_api::ProvideRuntimeApi;
use sp_application_crypto::AppCrypto;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_consensus::{Error as ConsensusError, SelectChain};
use sp_consensus_babe::{
digests::PreDigest, AuthorityId, BabeApi as BabeRuntimeApi, BabeConfiguration,
};
use sp_consensus_babe::{digests::PreDigest, AuthorityId, BabeApi as BabeRuntimeApi};
use sp_core::crypto::ByteArray;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as _};
use std::{collections::HashMap, sync::Arc};

const BABE_ERROR: i32 = 9000;

/// Provides rpc methods for interacting with Babe.
#[rpc(client, server)]
Expand All @@ -54,12 +55,10 @@ pub trait BabeApi {
pub struct Babe<B: BlockT, C, SC> {
/// shared reference to the client.
client: Arc<C>,
/// shared reference to EpochChanges
shared_epoch_changes: SharedEpochChanges<B, Epoch>,
/// A handle to the BABE worker for issuing requests.
babe_worker_handle: BabeWorkerHandle<B>,
/// shared reference to the Keystore
keystore: KeystorePtr,
/// config (actually holds the slot duration)
babe_config: BabeConfiguration,
/// The SelectChain strategy
select_chain: SC,
/// Whether to deny unsafe calls
Expand All @@ -70,13 +69,12 @@ impl<B: BlockT, C, SC> Babe<B, C, SC> {
/// Creates a new instance of the Babe Rpc handler.
pub fn new(
client: Arc<C>,
shared_epoch_changes: SharedEpochChanges<B, Epoch>,
babe_worker_handle: BabeWorkerHandle<B>,
keystore: KeystorePtr,
babe_config: BabeConfiguration,
select_chain: SC,
deny_unsafe: DenyUnsafe,
) -> Self {
Self { client, shared_epoch_changes, keystore, babe_config, select_chain, deny_unsafe }
Self { client, babe_worker_handle, keystore, select_chain, deny_unsafe }
}
}

Expand All @@ -93,21 +91,21 @@ where
{
async fn epoch_authorship(&self) -> RpcResult<HashMap<AuthorityId, EpochAuthorship>> {
self.deny_unsafe.check_if_safe()?;
let header = self.select_chain.best_chain().map_err(Error::Consensus).await?;

let best_header = self.select_chain.best_chain().map_err(Error::SelectChain).await?;

let epoch_start = self
.client
.runtime_api()
.current_epoch_start(header.hash())
.map_err(|err| Error::StringError(format!("{:?}", err)))?;

let epoch = epoch_data(
&self.shared_epoch_changes,
&self.client,
&self.babe_config,
*epoch_start,
&self.select_chain,
)
.await?;
.current_epoch_start(best_header.hash())
.map_err(|_| Error::FetchEpoch)?;

let epoch = self
.babe_worker_handle
.epoch_data_for_child_of(best_header.hash(), *best_header.number(), epoch_start)
.await
.map_err(|_| Error::FetchEpoch)?;

let (epoch_start, epoch_end) = (epoch.start_slot(), epoch.end_slot());
let mut claims: HashMap<AuthorityId, EpochAuthorship> = HashMap::new();

Expand Down Expand Up @@ -159,59 +157,37 @@ pub struct EpochAuthorship {
secondary_vrf: Vec<u64>,
}

/// Errors encountered by the RPC
/// Top-level error type for the RPC handler.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Consensus error
#[error(transparent)]
Consensus(#[from] ConsensusError),
/// Errors that can be formatted as a String
#[error("{0}")]
StringError(String),
/// Failed to fetch the current best header.
#[error("Failed to fetch the current best header: {0}")]
SelectChain(ConsensusError),
/// Failed to fetch epoch data.
#[error("Failed to fetch epoch data")]
FetchEpoch,
}

impl From<Error> for JsonRpseeError {
fn from(error: Error) -> Self {
let error_code = match error {
Error::SelectChain(_) => 1,
Error::FetchEpoch => 2,
};

JsonRpseeError::Call(CallError::Custom(ErrorObject::owned(
1234,
BABE_ERROR + error_code,
error.to_string(),
None::<()>,
Some(format!("{:?}", error)),
)))
}
}

/// Fetches the epoch data for a given slot.
async fn epoch_data<B, C, SC>(
epoch_changes: &SharedEpochChanges<B, Epoch>,
client: &Arc<C>,
babe_config: &BabeConfiguration,
slot: u64,
select_chain: &SC,
) -> Result<Epoch, Error>
where
B: BlockT,
C: HeaderBackend<B> + HeaderMetadata<B, Error = BlockChainError> + 'static,
SC: SelectChain<B>,
{
let parent = select_chain.best_chain().await?;
epoch_changes
.shared_data()
.epoch_data_for_child_of(
descendent_query(&**client),
&parent.hash(),
*parent.number(),
slot.into(),
|slot| Epoch::genesis(babe_config, slot),
)
.map_err(|e| Error::Consensus(ConsensusError::ChainLookup(e.to_string())))?
.ok_or(Error::Consensus(ConsensusError::InvalidAuthoritiesSet))
}

#[cfg(test)]
mod tests {
use super::*;
use sc_consensus_babe::block_import;
use sp_core::crypto::key_types::BABE;
use sp_consensus_babe::inherents::InherentDataProvider;
use sp_core::{crypto::key_types::BABE, testing::TaskExecutor};
use sp_keyring::Sr25519Keyring;
use sp_keystore::{testing::MemoryKeystore, Keystore};
use substrate_test_runtime_client::{
Expand All @@ -233,14 +209,35 @@ mod tests {
let builder = TestClientBuilder::new();
let (client, longest_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let task_executor = TaskExecutor::new();
let keystore = create_keystore(Sr25519Keyring::Alice);

let config = sc_consensus_babe::configuration(&*client).expect("config available");
let (_, link) = block_import(config.clone(), client.clone(), client.clone())
.expect("can initialize block-import");
let slot_duration = config.slot_duration();

let epoch_changes = link.epoch_changes().clone();
let keystore = create_keystore(Sr25519Keyring::Alice);
let (block_import, link) =
sc_consensus_babe::block_import(config.clone(), client.clone(), client.clone())
.expect("can initialize block-import");

let (_, babe_worker_handle) = sc_consensus_babe::import_queue(
link.clone(),
block_import.clone(),
None,
client.clone(),
longest_chain.clone(),
move |_, _| async move {
Ok((InherentDataProvider::from_timestamp_and_slot_duration(
0.into(),
slot_duration,
),))
},
&task_executor,
None,
None,
)
.unwrap();

Babe::new(client.clone(), epoch_changes, keystore, config, longest_chain, deny_unsafe)
Babe::new(client.clone(), babe_worker_handle, keystore, longest_chain, deny_unsafe)
}

#[tokio::test]
Expand Down
Loading

0 comments on commit cb95482

Please sign in to comment.