Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
babe: introduce a request-answering mechanic (#7833)
Browse files Browse the repository at this point in the history
* babe: introduce a request-answering mechanic

* gromble

* send method
  • Loading branch information
rphmeier authored Mar 4, 2021
1 parent a8c2bc6 commit fc2d2d3
Showing 1 changed file with 87 additions and 3 deletions.
90 changes: 87 additions & 3 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ use sc_client_api::{
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use futures::channel::mpsc::{channel, Sender, Receiver};
use futures::channel::oneshot;
use retain_mut::RetainMut;

use futures::prelude::*;
Expand Down Expand Up @@ -426,6 +427,8 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error>(BabeParams {
CAW: CanAuthorWith<B> + Send + 'static,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
{
const HANDLE_BUFFER_SIZE: usize = 1024;

let config = babe_link.config;
let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));

Expand All @@ -444,32 +447,108 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error>(BabeParams {

register_babe_inherent_data_provider(&inherent_data_providers, config.slot_duration())?;
sc_consensus_uncles::register_uncles_inherent_data_provider(
client,
client.clone(),
select_chain.clone(),
&inherent_data_providers,
)?;

info!(target: "babe", "👶 Starting BABE Authorship worker");
let inner = sc_consensus_slots::start_slot_worker(
config.0,
config.0.clone(),
select_chain,
worker,
sync_oracle,
inherent_data_providers,
babe_link.time_source,
can_author_with,
);

let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);

let answer_requests = answer_requests(worker_rx, config.0, client, babe_link.epoch_changes.clone());
Ok(BabeWorker {
inner: Box::pin(inner),
inner: Box::pin(future::join(inner, answer_requests).map(|_| ())),
slot_notification_sinks,
handle: BabeWorkerHandle(worker_tx),
})
}

async fn answer_requests<B: BlockT, C>(
mut request_rx: Receiver<BabeRequest<B>>,
genesis_config: sc_consensus_slots::SlotDuration<BabeGenesisConfiguration>,
client: Arc<C>,
epoch_changes: SharedEpochChanges<B, Epoch>,
)
where C: ProvideRuntimeApi<B> + ProvideCache<B> + ProvideUncles<B> + BlockchainEvents<B>
+ HeaderBackend<B> + HeaderMetadata<B, Error = ClientError> + Send + Sync + 'static,
{
while let Some(request) = request_rx.next().await {
match request {
BabeRequest::EpochForChild(parent_hash, parent_number, slot_number, response) => {
let lookup = || {
let epoch_changes = epoch_changes.lock();
let epoch_descriptor = epoch_changes.epoch_descriptor_for_child_of(
descendent_query(&*client),
&parent_hash,
parent_number,
slot_number,
)
.map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
.ok_or_else(|| Error::<B>::FetchEpoch(parent_hash))?;

let viable_epoch = epoch_changes.viable_epoch(
&epoch_descriptor,
|slot| Epoch::genesis(&genesis_config, slot)
).ok_or_else(|| Error::<B>::FetchEpoch(parent_hash))?;

Ok(sp_consensus_babe::Epoch {
epoch_index: viable_epoch.as_ref().epoch_index,
start_slot: viable_epoch.as_ref().start_slot,
duration: viable_epoch.as_ref().duration,
authorities: viable_epoch.as_ref().authorities.clone(),
randomness: viable_epoch.as_ref().randomness,
})
};

let _ = response.send(lookup());
}
}
}
}

/// Requests to the BABE service.
#[non_exhaustive]
pub enum BabeRequest<B: BlockT> {
/// Request the epoch that a child of the given block, with the given slot number would have.
///
/// The parent block is identified by its hash and number.
EpochForChild(
B::Hash,
NumberFor<B>,
Slot,
oneshot::Sender<Result<sp_consensus_babe::Epoch, Error<B>>>,
),
}

/// A handle to the BABE worker for issuing requests.
#[derive(Clone)]
pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);

impl<B: BlockT> BabeWorkerHandle<B> {
/// Send a request to the BABE service.
pub async fn send(&mut self, request: BabeRequest<B>) {
// Failure to send means that the service is down.
// This will manifest as the receiver of the request being dropped.
let _ = self.0.send(request).await;
}
}

/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output=()> + Send + 'static>>,
slot_notification_sinks: SlotNotificationSinks<B>,
handle: BabeWorkerHandle<B>,
}

impl<B: BlockT> BabeWorker<B> {
Expand All @@ -484,6 +563,11 @@ impl<B: BlockT> BabeWorker<B> {
self.slot_notification_sinks.lock().push(sink);
stream
}

/// Get a handle to the worker.
pub fn handle(&self) -> BabeWorkerHandle<B> {
self.handle.clone()
}
}

impl<B: BlockT> futures::Future for BabeWorker<B> {
Expand Down

0 comments on commit fc2d2d3

Please sign in to comment.