Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

proposer: wait for a hash to be in the active-leaves set #1616

Merged
11 commits merged into from
Aug 20, 2020
4 changes: 4 additions & 0 deletions node/core/proposer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ where
let mut provisioner_inherent_data = async move {
let (sender, receiver) = futures::channel::oneshot::channel();

overseer.wait_for_activation(parent_header_hash, sender).await?;
receiver.await.map_err(Error::ClosedChannelFromProvisioner)?;

let (sender, receiver) = futures::channel::oneshot::channel();
// strictly speaking, we don't _have_ to .await this send_msg before opening the
// receiver; it's possible that the response there would be ready slightly before
// this call completes. IMO it's not worth the hassle or overhead of spawning a
Expand Down
79 changes: 68 additions & 11 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use std::collections::HashSet;
use std::collections::HashMap;

use futures::channel::{mpsc, oneshot};
use futures::{
Expand Down Expand Up @@ -165,9 +165,18 @@ enum Event {
BlockImported(BlockInfo),
BlockFinalized(BlockInfo),
MsgToSubsystem(AllMessages),
ExternalRequest(ExternalRequest),
Stop,
}

/// Some request from outer world.
enum ExternalRequest {
WaitForActivation {
hash: Hash,
response_channel: oneshot::Sender<()>,
},
}

/// A handler used to communicate with the [`Overseer`].
///
/// [`Overseer`]: struct.Overseer.html
Expand Down Expand Up @@ -198,6 +207,22 @@ impl OverseerHandler {
Ok(())
}

/// Wait for a block with the given hash to be in the active-leaves set.
/// This method is used for external code like `Proposer` that doesn't subscribe to Overseer's signals.
///
/// The response channel responds if the hash was activated.
/// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas,
/// the response channel may never return if the hash was deactivated before this call.
/// In this case, it's the caller's responsibility to ensure a timeout is set.
pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender<()>) -> SubsystemResult<()> {
self.events_tx.send(Event::ExternalRequest(ExternalRequest::WaitForActivation {
hash,
response_channel
})).await?;

Ok(())
ordian marked this conversation as resolved.
Show resolved Hide resolved
}

/// Tell `Overseer` to shutdown.
pub async fn stop(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::Stop).await?;
Expand Down Expand Up @@ -399,13 +424,17 @@ pub struct Overseer<S: SpawnNamed> {
/// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver<Event>,

/// External listeners waiting for a hash to be in the active-leave set.
// TODO (now): how to clean it up? Use LRUCache? Or .retain(|_, v| { v.retain(|c| !c.is_cancelled()); v })
activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<()>>>,
coriolinus marked this conversation as resolved.
Show resolved Hide resolved

/// A set of leaves that `Overseer` starts working with.
///
/// Drained at the beginning of `run` and never used again.
leaves: Vec<(Hash, BlockNumber)>,

/// The set of the "active leaves".
active_leaves: HashSet<(Hash, BlockNumber)>,
active_leaves: HashMap<Hash, BlockNumber>,

/// Various Prometheus metrics.
metrics: Metrics,
Expand Down Expand Up @@ -749,9 +778,10 @@ where
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number))
.collect();

let active_leaves = HashSet::new();
let active_leaves = HashMap::new();

let metrics = <Metrics as metrics::Metrics>::register(prometheus_registry);
let activation_external_listeners = HashMap::new();

let this = Self {
candidate_validation_subsystem,
Expand All @@ -773,6 +803,7 @@ where
running_subsystems,
running_subsystems_rx,
events_rx,
activation_external_listeners,
leaves,
active_leaves,
metrics,
Expand Down Expand Up @@ -865,8 +896,8 @@ where

for leaf in leaves.into_iter() {
update.activated.push(leaf.0);
self.active_leaves.insert(leaf);
self.metrics.on_head_activated();
self.active_leaves.insert(leaf.0, leaf.1);
self.on_head_activated(&leaf.0);
}

self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
Expand All @@ -887,6 +918,9 @@ where
Event::BlockFinalized(block) => {
self.block_finalized(block).await?;
}
Event::ExternalRequest(request) => {
self.handle_external_request(request);
}
}
}

Expand Down Expand Up @@ -919,15 +953,15 @@ where
async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default();

if let Some(parent) = block.number.checked_sub(1).and_then(|number| self.active_leaves.take(&(block.parent_hash, number))) {
update.deactivated.push(parent.0);
if let Some(_number) = self.active_leaves.remove(&block.parent_hash) {
coriolinus marked this conversation as resolved.
Show resolved Hide resolved
ordian marked this conversation as resolved.
Show resolved Hide resolved
update.deactivated.push(block.parent_hash);
self.metrics.on_head_deactivated();
}

if !self.active_leaves.contains(&(block.hash, block.number)) {
if self.active_leaves.get(&block.hash).is_none() {
coriolinus marked this conversation as resolved.
Show resolved Hide resolved
update.activated.push(block.hash);
self.active_leaves.insert((block.hash, block.number));
self.metrics.on_head_activated();
self.active_leaves.insert(block.hash, block.number);
self.on_head_activated(&block.hash);
}

self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
Expand All @@ -939,7 +973,7 @@ where
let mut update = ActiveLeavesUpdate::default();
let metrics = &self.metrics;

self.active_leaves.retain(|(h, n)| {
self.active_leaves.retain(|h, n| {
if *n <= block.number {
update.deactivated.push(*h);
metrics.on_head_deactivated();
Expand Down Expand Up @@ -1100,6 +1134,29 @@ where
}
}

fn on_head_activated(&mut self, hash: &Hash) {
self.metrics.on_head_activated();
if let Some(listeners) = self.activation_external_listeners.remove(&hash) {
for listener in listeners {
// it's fine if the listener is no longer interested
let _ = listener.send(());
}
}
}

fn handle_external_request(&mut self, request: ExternalRequest) {
match request {
ExternalRequest::WaitForActivation { hash, response_channel } => {
if self.active_leaves.get(&hash).is_some() {
// it's fine if the listener is no longer interested
let _ = response_channel.send(());
} else {
self.activation_external_listeners.entry(hash).or_default().push(response_channel);
}
}
}
}

fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.s.spawn(name, j);
}
Expand Down