From 6f423a1a7f37067843f4a9ceeedc0fa58ea69caf Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 19 Aug 2020 14:20:33 +0200 Subject: [PATCH 01/10] overseer: add ExternalRequest to Event --- node/overseer/src/lib.rs | 79 ++++++++++++++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 11 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 321a20f568f5..0fbfb8f6c647 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -59,7 +59,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Poll; use std::time::Duration; -use std::collections::HashSet; +use std::collections::HashMap; use futures::channel::{mpsc, oneshot}; use futures::{ @@ -165,9 +165,18 @@ enum Event { BlockImported(BlockInfo), BlockFinalized(BlockInfo), MsgToSubsystem(AllMessages), + ExternalRequest(ExternalRequest), Stop, } +/// Some request from outer world. +enum ExternalRequest { + WaitForActivation { + hash: Hash, + response_channel: oneshot::Sender<()>, + }, +} + /// A handler used to communicate with the [`Overseer`]. /// /// [`Overseer`]: struct.Overseer.html @@ -198,6 +207,22 @@ impl OverseerHandler { Ok(()) } + /// Wait for a block with the given hash to be in the active-leaves set. + /// This method is used for external code like `Proposer` that doesn't subscribe to Overseer's signals. + /// + /// The response channel responds if the hash was activated. + /// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas, + /// the response channel may never return if the hash was activated before this call. + /// In this case, it's the caller's responsibility to ensure a timeout is set. + pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender<()>) -> SubsystemResult<()> { + self.events_tx.send(Event::ExternalRequest(ExternalRequest::WaitForActivation { + hash, + response_channel + })).await?; + + Ok(()) + } + /// Tell `Overseer` to shutdown. pub async fn stop(&mut self) -> SubsystemResult<()> { self.events_tx.send(Event::Stop).await?; @@ -399,13 +424,17 @@ pub struct Overseer { /// Events that are sent to the overseer from the outside world events_rx: mpsc::Receiver, + /// External listeners waiting for a hash to be in the active-leave set. + // TODO (now): how to clean it up? Use LRUCache? + activation_external_listeners: HashMap>>, + /// A set of leaves that `Overseer` starts working with. /// /// Drained at the beginning of `run` and never used again. leaves: Vec<(Hash, BlockNumber)>, /// The set of the "active leaves". - active_leaves: HashSet<(Hash, BlockNumber)>, + active_leaves: HashMap, /// Various Prometheus metrics. metrics: Metrics, @@ -749,9 +778,10 @@ where .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)) .collect(); - let active_leaves = HashSet::new(); + let active_leaves = HashMap::new(); let metrics = ::register(prometheus_registry); + let activation_external_listeners = HashMap::new(); let this = Self { candidate_validation_subsystem, @@ -773,6 +803,7 @@ where running_subsystems, running_subsystems_rx, events_rx, + activation_external_listeners, leaves, active_leaves, metrics, @@ -865,8 +896,8 @@ where for leaf in leaves.into_iter() { update.activated.push(leaf.0); - self.active_leaves.insert(leaf); - self.metrics.on_head_activated(); + self.active_leaves.insert(leaf.0, leaf.1); + self.on_head_activated(&leaf.0); } self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; @@ -887,6 +918,9 @@ where Event::BlockFinalized(block) => { self.block_finalized(block).await?; } + Event::ExternalRequest(request) => { + self.handle_external_request(request); + } } } @@ -919,15 +953,15 @@ where async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { let mut update = ActiveLeavesUpdate::default(); - if let Some(parent) = block.number.checked_sub(1).and_then(|number| self.active_leaves.take(&(block.parent_hash, number))) { - update.deactivated.push(parent.0); + if let Some(_number) = self.active_leaves.remove(&block.parent_hash) { + update.deactivated.push(block.parent_hash); self.metrics.on_head_deactivated(); } - if !self.active_leaves.contains(&(block.hash, block.number)) { + if self.active_leaves.get(&block.hash).is_none() { update.activated.push(block.hash); - self.active_leaves.insert((block.hash, block.number)); - self.metrics.on_head_activated(); + self.active_leaves.insert(block.hash, block.number); + self.on_head_activated(&block.hash); } self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; @@ -939,7 +973,7 @@ where let mut update = ActiveLeavesUpdate::default(); let metrics = &self.metrics; - self.active_leaves.retain(|(h, n)| { + self.active_leaves.retain(|h, n| { if *n <= block.number { update.deactivated.push(*h); metrics.on_head_deactivated(); @@ -1100,6 +1134,29 @@ where } } + fn on_head_activated(&mut self, hash: &Hash) { + self.metrics.on_head_activated(); + if let Some(listeners) = self.activation_external_listeners.remove(&hash) { + for listener in listeners { + // it's fine if the listener is no longer interested + let _ = listener.send(()); + } + } + } + + fn handle_external_request(&mut self, request: ExternalRequest) { + match request { + ExternalRequest::WaitForActivation { hash, response_channel } => { + if self.active_leaves.get(&hash).is_some() { + // it's fine if the listener is no longer interested + let _ = response_channel.send(()); + } else { + self.activation_external_listeners.entry(hash).or_default().push(response_channel); + } + } + } + } + fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) { self.s.spawn(name, j); } From 0ceae22277504997fb0ab2e8bafab65b9a964c52 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 19 Aug 2020 18:42:16 +0200 Subject: [PATCH 02/10] proposer: wait for the hash to be activated --- node/core/proposer/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/node/core/proposer/src/lib.rs b/node/core/proposer/src/lib.rs index d43810227ba4..8fd432a1be90 100644 --- a/node/core/proposer/src/lib.rs +++ b/node/core/proposer/src/lib.rs @@ -119,6 +119,10 @@ where let mut provisioner_inherent_data = async move { let (sender, receiver) = futures::channel::oneshot::channel(); + overseer.wait_for_activation(parent_header_hash, sender).await?; + receiver.await.map_err(Error::ClosedChannelFromProvisioner)?; + + let (sender, receiver) = futures::channel::oneshot::channel(); // strictly speaking, we don't _have_ to .await this send_msg before opening the // receiver; it's possible that the response there would be ready slightly before // this call completes. IMO it's not worth the hassle or overhead of spawning a From 307819666afd05a30dab57b4329df226b0ffa7da Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 19 Aug 2020 18:52:43 +0200 Subject: [PATCH 03/10] update comments --- node/overseer/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 0fbfb8f6c647..d4ad30539e42 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -212,7 +212,7 @@ impl OverseerHandler { /// /// The response channel responds if the hash was activated. /// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas, - /// the response channel may never return if the hash was activated before this call. + /// the response channel may never return if the hash was deactivated before this call. /// In this case, it's the caller's responsibility to ensure a timeout is set. pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender<()>) -> SubsystemResult<()> { self.events_tx.send(Event::ExternalRequest(ExternalRequest::WaitForActivation { @@ -425,7 +425,7 @@ pub struct Overseer { events_rx: mpsc::Receiver, /// External listeners waiting for a hash to be in the active-leave set. - // TODO (now): how to clean it up? Use LRUCache? + // TODO (now): how to clean it up? Use LRUCache? Or .retain(|_, v| { v.retain(|c| !c.is_cancelled()); v }) activation_external_listeners: HashMap>>, /// A set of leaves that `Overseer` starts working with. From 1813442d694990ff121eff0acfe325215cec870f Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 20 Aug 2020 12:46:50 +0200 Subject: [PATCH 04/10] overseer: handle unbounded growth of listeners map --- node/overseer/src/lib.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index d4ad30539e42..b09970c72102 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -210,7 +210,7 @@ impl OverseerHandler { /// Wait for a block with the given hash to be in the active-leaves set. /// This method is used for external code like `Proposer` that doesn't subscribe to Overseer's signals. /// - /// The response channel responds if the hash was activated. + /// The response channel responds if the hash was activated and is closed if the hash was deactivated. /// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas, /// the response channel may never return if the hash was deactivated before this call. /// In this case, it's the caller's responsibility to ensure a timeout is set. @@ -425,7 +425,6 @@ pub struct Overseer { events_rx: mpsc::Receiver, /// External listeners waiting for a hash to be in the active-leave set. - // TODO (now): how to clean it up? Use LRUCache? Or .retain(|_, v| { v.retain(|c| !c.is_cancelled()); v }) activation_external_listeners: HashMap>>, /// A set of leaves that `Overseer` starts working with. @@ -955,7 +954,7 @@ where if let Some(_number) = self.active_leaves.remove(&block.parent_hash) { update.deactivated.push(block.parent_hash); - self.metrics.on_head_deactivated(); + self.on_head_deactivated(block.parent_hash); } if self.active_leaves.get(&block.hash).is_none() { @@ -976,13 +975,16 @@ where self.active_leaves.retain(|h, n| { if *n <= block.number { update.deactivated.push(*h); - metrics.on_head_deactivated(); false } else { true } }); + for deactivated in &update.deactivated { + self.on_head_deactivated(deactivated) + } + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash)).await?; @@ -1144,6 +1146,14 @@ where } } + fn on_head_deactivated(&mut self, hash: &Hash) { + self.metrics.on_head_deactivated(); + if let Some(listeners) = self.activation_external_listeners.remove(&hash) { + // clean up and signal to listeners the block is deactivated + drop(listeners); + } + } + fn handle_external_request(&mut self, request: ExternalRequest) { match request { ExternalRequest::WaitForActivation { hash, response_channel } => { From 6bd826e28ec06fd205f3538cd6a67f15eb43e9b9 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 20 Aug 2020 12:48:57 +0200 Subject: [PATCH 05/10] overseer: fix compilation --- node/overseer/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index b09970c72102..31d227771874 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -954,7 +954,7 @@ where if let Some(_number) = self.active_leaves.remove(&block.parent_hash) { update.deactivated.push(block.parent_hash); - self.on_head_deactivated(block.parent_hash); + self.on_head_deactivated(&block.parent_hash); } if self.active_leaves.get(&block.hash).is_none() { @@ -970,7 +970,6 @@ where async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { let mut update = ActiveLeavesUpdate::default(); - let metrics = &self.metrics; self.active_leaves.retain(|h, n| { if *n <= block.number { From bf834d46144543e07a89626c9934a1882e9153e4 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 20 Aug 2020 12:55:30 +0200 Subject: [PATCH 06/10] overseer: clean up dead listeners --- node/overseer/src/lib.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 31d227771874..17877b6bbcb0 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -963,6 +963,8 @@ where self.on_head_activated(&block.hash); } + self.clean_up_external_listeners(); + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; Ok(()) @@ -1153,6 +1155,14 @@ where } } + fn clean_up_external_listeners(&mut self) { + self.activation_external_listeners.retain(|_, v| { + // remove dead listeners + v.retain(|c| !c.is_canceled()); + !v.is_empty() + }) + } + fn handle_external_request(&mut self, request: ExternalRequest) { match request { ExternalRequest::WaitForActivation { hash, response_channel } => { From d0173965dfff14fd543372928ac656eace9c258f Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 20 Aug 2020 13:13:54 +0200 Subject: [PATCH 07/10] overseer: cosmetic changes --- node/overseer/src/lib.rs | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 17877b6bbcb0..fadf3d7e14dc 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -322,9 +322,7 @@ impl SubsystemContext for OverseerSubsystemContext { self.tx.send(ToOverseer::SpawnJob { name, s, - }).await?; - - Ok(()) + }).await.map_err(Into::into) } async fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>) @@ -333,24 +331,18 @@ impl SubsystemContext for OverseerSubsystemContext { self.tx.send(ToOverseer::SpawnBlockingJob { name, s, - }).await?; - - Ok(()) + }).await.map_err(Into::into) } async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { - self.tx.send(ToOverseer::SubsystemMessage(msg)).await?; - - Ok(()) + self.tx.send(ToOverseer::SubsystemMessage(msg)).await.map_err(Into::into) } async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> where T: IntoIterator + Send, T::IntoIter: Send { let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok)); - self.tx.send_all(&mut msgs).await?; - - Ok(()) + self.tx.send_all(&mut msgs).await.map_err(Into::into) } } @@ -893,10 +885,10 @@ where let leaves = std::mem::take(&mut self.leaves); let mut update = ActiveLeavesUpdate::default(); - for leaf in leaves.into_iter() { - update.activated.push(leaf.0); - self.active_leaves.insert(leaf.0, leaf.1); - self.on_head_activated(&leaf.0); + for (hash, number) in leaves.into_iter() { + update.activated.push(hash); + self.active_leaves.insert(hash, number); + self.on_head_activated(&hash); } self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; @@ -1139,7 +1131,7 @@ where fn on_head_activated(&mut self, hash: &Hash) { self.metrics.on_head_activated(); - if let Some(listeners) = self.activation_external_listeners.remove(&hash) { + if let Some(listeners) = self.activation_external_listeners.remove(hash) { for listener in listeners { // it's fine if the listener is no longer interested let _ = listener.send(()); @@ -1149,7 +1141,7 @@ where fn on_head_deactivated(&mut self, hash: &Hash) { self.metrics.on_head_deactivated(); - if let Some(listeners) = self.activation_external_listeners.remove(&hash) { + if let Some(listeners) = self.activation_external_listeners.remove(hash) { // clean up and signal to listeners the block is deactivated drop(listeners); } From 9d9e23420b94ffa99ca1de64e103940166c3e36d Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 20 Aug 2020 13:15:01 +0200 Subject: [PATCH 08/10] overseer: cosmetic changes t.2 --- node/overseer/src/lib.rs | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index fadf3d7e14dc..09aca23b3d60 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -188,23 +188,17 @@ pub struct OverseerHandler { impl OverseerHandler { /// Inform the `Overseer` that that some block was imported. pub async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { - self.events_tx.send(Event::BlockImported(block)).await?; - - Ok(()) + self.events_tx.send(Event::BlockImported(block)).await.map_err(Into::into) } /// Send some message to one of the `Subsystem`s. pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> { - self.events_tx.send(Event::MsgToSubsystem(msg)).await?; - - Ok(()) + self.events_tx.send(Event::MsgToSubsystem(msg)).await.map_err(Into::into) } /// Inform the `Overseer` that that some block was finalized. pub async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { - self.events_tx.send(Event::BlockFinalized(block)).await?; - - Ok(()) + self.events_tx.send(Event::BlockFinalized(block)).await.map_err(Into::into) } /// Wait for a block with the given hash to be in the active-leaves set. @@ -218,16 +212,12 @@ impl OverseerHandler { self.events_tx.send(Event::ExternalRequest(ExternalRequest::WaitForActivation { hash, response_channel - })).await?; - - Ok(()) + })).await.map_err(Into::into) } /// Tell `Overseer` to shutdown. pub async fn stop(&mut self) -> SubsystemResult<()> { - self.events_tx.send(Event::Stop).await?; - - Ok(()) + self.events_tx.send(Event::Stop).await.map_err(Into::into) } } From b21fafab40718c9a01305e0c2a747caaa92f7771 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 20 Aug 2020 15:07:27 +0200 Subject: [PATCH 09/10] overseer: add debug_assertions --- node/overseer/src/lib.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 09aca23b3d60..ea1231bec429 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -59,7 +59,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Poll; use std::time::Duration; -use std::collections::HashMap; +use std::collections::{hash_map, HashMap}; use futures::channel::{mpsc, oneshot}; use futures::{ @@ -934,16 +934,24 @@ where async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { let mut update = ActiveLeavesUpdate::default(); - if let Some(_number) = self.active_leaves.remove(&block.parent_hash) { + if let Some(number) = self.active_leaves.remove(&block.parent_hash) { + if let Some(expected_parent_number) = block.number.checked_sub(1) { + debug_assert_eq!(expected_parent_number, number); + } update.deactivated.push(block.parent_hash); self.on_head_deactivated(&block.parent_hash); } - if self.active_leaves.get(&block.hash).is_none() { - update.activated.push(block.hash); - self.active_leaves.insert(block.hash, block.number); - self.on_head_activated(&block.hash); - } + match self.active_leaves.entry(block.hash) { + hash_map::Entry::Vacant(entry) => { + update.activated.push(block.hash); + entry.insert(block.number); + self.on_head_activated(&block.hash); + }, + hash_map::Entry::Occupied(entry) => { + debug_assert_eq!(*entry.get(), block.number); + } + } self.clean_up_external_listeners(); From 47db099ef4953d88a4503b48098c3a99c080b4e7 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 20 Aug 2020 15:08:12 +0200 Subject: [PATCH 10/10] overseer: fix formatting --- node/overseer/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index ea1231bec429..6ff081f652ea 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -951,7 +951,7 @@ where hash_map::Entry::Occupied(entry) => { debug_assert_eq!(*entry.get(), block.number); } - } + } self.clean_up_external_listeners();