Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

babe: introduce a request-answering mechanic #7833

Merged
merged 6 commits into from
Mar 4, 2021
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 78 additions & 3 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ use sc_client_api::{
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use futures::channel::mpsc::{channel, Sender, Receiver};
use futures::channel::oneshot;
use retain_mut::RetainMut;

use futures::prelude::*;
Expand Down Expand Up @@ -428,6 +429,8 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error>(BabeParams {
CAW: CanAuthorWith<B> + Send + 'static,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
{
const HANDLE_BUFFER_SIZE: usize = 1024;

let config = babe_link.config;
let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));

Expand All @@ -446,32 +449,99 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error>(BabeParams {

register_babe_inherent_data_provider(&inherent_data_providers, config.slot_duration())?;
sc_consensus_uncles::register_uncles_inherent_data_provider(
client,
client.clone(),
select_chain.clone(),
&inherent_data_providers,
)?;

info!(target: "babe", "👶 Starting BABE Authorship worker");
let inner = sc_consensus_slots::start_slot_worker(
config.0,
config.0.clone(),
select_chain,
worker,
sync_oracle,
inherent_data_providers,
babe_link.time_source,
can_author_with,
);

let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);

let answer_requests = answer_requests(worker_rx, config.0, client, babe_link.epoch_changes.clone());
Ok(BabeWorker {
inner: Box::pin(inner),
inner: Box::pin(future::join(inner, answer_requests).map(|_| ())),
slot_notification_sinks,
handle: BabeWorkerHandle(worker_tx),
})
}

async fn answer_requests<B: BlockT, C>(
mut request_rx: Receiver<BabeRequest<B>>,
genesis_config: sc_consensus_slots::SlotDuration<BabeGenesisConfiguration>,
client: Arc<C>,
epoch_changes: SharedEpochChanges<B, Epoch>,
)
where C: ProvideRuntimeApi<B> + ProvideCache<B> + ProvideUncles<B> + BlockchainEvents<B>
+ HeaderBackend<B> + HeaderMetadata<B, Error = ClientError> + Send + Sync + 'static,
{
while let Some(request) = request_rx.next().await {
match request {
BabeRequest::EpochForChild(parent_hash, parent_number, slot_number, response) => {
let lookup = || {
let epoch_changes = epoch_changes.lock();
let epoch_descriptor = epoch_changes.epoch_descriptor_for_child_of(
descendent_query(&*client),
&parent_hash,
parent_number,
slot_number,
)
.map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
.ok_or_else(|| Error::<B>::FetchEpoch(parent_hash))?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

let viable_epoch = epoch_changes.viable_epoch(
&epoch_descriptor,
|slot| Epoch::genesis(&genesis_config, slot)
).ok_or_else(|| Error::<B>::FetchEpoch(parent_hash))?;

Ok(sp_consensus_babe::Epoch {
epoch_index: viable_epoch.as_ref().epoch_index,
start_slot: viable_epoch.as_ref().start_slot,
duration: viable_epoch.as_ref().duration,
authorities: viable_epoch.as_ref().authorities.clone(),
randomness: viable_epoch.as_ref().randomness,
})
};

let _ = response.send(lookup());
}
}
}
}

/// Requests to the BABE service.
#[non_exhaustive]
pub enum BabeRequest<B: BlockT> {
/// Request the epoch that a child of the given block, with the given slot number would have.
///
/// The parent block is identified by its hash and number.
EpochForChild(
B::Hash,
NumberFor<B>,
SlotNumber,
oneshot::Sender<Result<sp_consensus_babe::Epoch, Error<B>>>,
),
}

/// A handle to the BABE worker for issuing requests.
#[derive(Clone)]
pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);

/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output=()> + Send + 'static>>,
slot_notification_sinks: Arc<Mutex<Vec<Sender<(u64, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)>>>>,
handle: BabeWorkerHandle<B>,
}

impl<B: BlockT> BabeWorker<B> {
Expand All @@ -486,6 +556,11 @@ impl<B: BlockT> BabeWorker<B> {
self.slot_notification_sinks.lock().push(sink);
stream
}

/// Get a handle to the worker.
pub fn handle(&self) -> BabeWorkerHandle<B> {
self.handle.clone()
bkchr marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl<B: BlockT> futures::Future for BabeWorker<B> {
Expand Down