Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Block ActiveLeavesUpdate and BlockFinalized events in overseer until major sync is complete #6689

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ orchestra = "0.0.4"
gum = { package = "tracing-gum", path = "../gum" }
lru = "0.9"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
async-trait = "0.1.57"
tikv-jemalloc-ctl = "0.5.0"

Expand Down
5 changes: 3 additions & 2 deletions node/overseer/src/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{
prometheus::Registry, HeadSupportsParachains, InitializedOverseerBuilder, MetricsTrait,
Overseer, OverseerMetrics, OverseerSignal, OverseerSubsystemContext, SpawnGlue,
prometheus::Registry, HeadSupportsParachains, InitializedOverseerBuilder, MajorSyncOracle,
MetricsTrait, Overseer, OverseerMetrics, OverseerSignal, OverseerSubsystemContext, SpawnGlue,
KNOWN_LEAVES_CACHE_SIZE,
};
use lru::LruCache;
Expand Down Expand Up @@ -193,6 +193,7 @@ where
.leaves(Default::default())
.spawner(SpawnGlue(spawner))
.metrics(metrics)
.sync_oracle(MajorSyncOracle::new_dummy())
.supports_parachains(supports_parachains);
Ok(builder)
}
230 changes: 173 additions & 57 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use polkadot_node_subsystem_types::messages::{
DisputeDistributionMessage, GossipSupportMessage, NetworkBridgeRxMessage,
NetworkBridgeTxMessage, ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
};
use sp_consensus::SyncOracle;

pub use polkadot_node_subsystem_types::{
errors::{SubsystemError, SubsystemResult},
Expand Down Expand Up @@ -330,6 +331,40 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut hand
}
}

/// Used to detect if the node is in initial major sync.
/// It's worth mentioning that this is a one way check. Once the initial full sync is complete
/// `MajorSyncOracle` will never return false. The reason is that the struct is meant to be used
/// only during initialization.
pub struct MajorSyncOracle {
sync_oracle: Option<Box<dyn SyncOracle + Send>>,
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
}

impl MajorSyncOracle {
/// Create `MajorSyncOracle` from `SyncOracle`
pub fn new(sync_oracle: Box<dyn SyncOracle + Send>) -> Self {
Self { sync_oracle: Some(sync_oracle) }
}

/// Create dummy `MajorSyncOracle` which always returns true for `finished_syncing`
pub fn new_dummy() -> Self {
Self { sync_oracle: None }
}

/// Check if node is in major sync
pub fn finished_syncing(&mut self) -> bool {
match &mut self.sync_oracle {
Some(sync_oracle) =>
if !sync_oracle.is_major_syncing() {
self.sync_oracle = None;
true
} else {
false
},
None => true,
}
}
}

/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
///
/// This returns the overseer along with an [`OverseerHandle`] which can
Expand Down Expand Up @@ -629,6 +664,9 @@ pub struct Overseer<SupportsParachains> {

/// Various Prometheus metrics.
pub metrics: OverseerMetrics,

/// SyncOracle is used to detect when initial full node sync is complete
pub sync_oracle: MajorSyncOracle,
}

/// Spawn the metrics metronome task.
Expand Down Expand Up @@ -725,68 +763,159 @@ where
let metrics = self.metrics.clone();
spawn_metronome_metrics(&mut self, metrics)?;

// Notify about active leaves on startup before starting the loop
let initial_sync_finished = self.sync_oracle.finished_syncing();
// Import the active leaves found in the database
for (hash, number) in std::mem::take(&mut self.leaves) {
let _ = self.active_leaves.insert(hash, number);
if let Some((span, status)) = self.on_head_activated(&hash, None).await {
let update =
ActiveLeavesUpdate::start_work(ActivatedLeaf { hash, number, status, span });
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
if initial_sync_finished {
// Initial sync is complete. Notify the subsystems and proceed to the main loop
let update = ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash,
number,
status,
span,
});
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
}
}

loop {
// If initial sync is not complete - wait for it.
// This loop is identical to the main one but doesn't generate `ActiveLeaves` and `BlockFinalized` events until
// the initial full sync is complete.
// This is an infinite loop which executes only when `!initial_sync_finished`. The weird syntax is only to save
// one extra layer if indentation because it's already a bit tough for the eyes.
// Think about it like:
// ```
// if !initial_sync_finished {
// loop {
// select! {
// ...
// }
// }
// }
//```
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
while !initial_sync_finished {
select! {
msg = self.events_rx.select_next_some() => {
match msg {
Event::MsgToSubsystem { msg, origin } => {
self.route_message(msg.into(), origin).await?;
self.metrics.on_message_relayed();
}
Event::Stop => {
self.stop().await;
return Ok(());
}
Event::BlockImported(block) => {
self.block_imported(block).await?;
}
Event::BlockFinalized(block) => {
self.block_finalized(block).await?;
}
Event::ExternalRequest(request) => {
self.handle_external_request(request);
}
Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?,
Event::Stop => return self.handle_stop().await,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A concern: I think with current impl Ctrl-C before major sync is complete should not stall/deadlock subsystems, but would be nice to double check that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should this stall the subsystems?

Event::BlockImported(block) => _ = self.block_imported(block).await,
Event::BlockFinalized(block) if self.sync_oracle.finished_syncing() => {
// Initial sync is complete
ordian marked this conversation as resolved.
Show resolved Hide resolved
self.block_finalized(&block).await;
// Send initial `ActiveLeaves`
for (hash, number) in self.active_leaves.clone().into_iter() {
ordian marked this conversation as resolved.
Show resolved Hide resolved
let span = match self.span_per_active_leaf.get(&hash) {
Some(span) => span.clone(),
None => {
// This should never happen. Spans are generated in `on_head_activated`
// which is called from `block_imported`. Despite not sending a signal
// `BlockImported` events are handled so a span should exist for each
// active leaf.
gum::error!(
target: LOG_TARGET,
?hash,
?number,
"Span for active leaf not found. This is not expected"
);
let span = Arc::new(jaeger::Span::new(hash, "leaf-activated"));
self.span_per_active_leaf.insert(hash, span.clone());
span
}
};

let update = ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash,
number,
status: LeafStatus::Fresh,
span,
});
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
// Send initial `BlockFinalized`
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
break
},
Event::BlockFinalized(block) => _ = self.block_finalized(&block).await,
Event::ExternalRequest(request) => self.handle_external_request(request)
}
},
msg = self.to_orchestra_rx.select_next_some() => {
msg = self.to_orchestra_rx.select_next_some() => self.handle_orchestra_rx(msg),
res = self.running_subsystems.select_next_some() => return self.handle_running_subsystems(res).await
}
}

// main loop
loop {
select! {
msg = self.events_rx.select_next_some() => {
match msg {
ToOrchestra::SpawnJob { name, subsystem, s } => {
self.spawn_job(name, subsystem, s);
}
ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
self.spawn_blocking_job(name, subsystem, s);
}
Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?,
Event::Stop => return self.handle_stop().await,
Event::BlockImported(block) => {
let update = self.block_imported(block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
},
Event::BlockFinalized(block) => _ = self.block_finalized(&block).await,
Event::ExternalRequest(request) => self.handle_external_request(request)
}
},
res = self.running_subsystems.select_next_some() => {
gum::error!(
target: LOG_TARGET,
subsystem = ?res,
"subsystem finished unexpectedly",
);
self.stop().await;
return res;
},
msg = self.to_orchestra_rx.select_next_some() => self.handle_orchestra_rx(msg),
res = self.running_subsystems.select_next_some() => return self.handle_running_subsystems(res).await
}
}
}

async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
async fn handle_stop(self) -> Result<(), SubsystemError> {
self.stop().await;
return Ok(())
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
}

fn handle_orchestra_rx(&mut self, msg: ToOrchestra) {
match msg {
ToOrchestra::SpawnJob { name, subsystem, s } => {
self.spawn_job(name, subsystem, s);
},
ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
self.spawn_blocking_job(name, subsystem, s);
},
}
}

async fn handle_msg_to_subsystem(
&mut self,
msg: AllMessages,
origin: &'static str,
) -> Result<(), SubsystemError> {
self.route_message(msg.into(), origin).await?;
self.metrics.on_message_relayed();
Ok(())
}

async fn handle_running_subsystems(
self,
res: Result<(), SubsystemError>,
) -> Result<(), SubsystemError> {
gum::error!(
target: LOG_TARGET,
subsystem = ?res,
"subsystem finished unexpectedly",
);
self.stop().await;
return res
}

async fn block_imported(&mut self, block: BlockInfo) -> ActiveLeavesUpdate {
match self.active_leaves.entry(block.hash) {
hash_map::Entry::Vacant(entry) => entry.insert(block.number),
hash_map::Entry::Occupied(entry) => {
debug_assert_eq!(*entry.get(), block.number);
return Ok(())
return ActiveLeavesUpdate::default()
},
};

Expand All @@ -808,13 +937,10 @@ where

self.clean_up_external_listeners();

if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
Ok(())
update
}

async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
async fn block_finalized(&mut self, block: &BlockInfo) -> ActiveLeavesUpdate {
let mut update = ActiveLeavesUpdate::default();

self.active_leaves.retain(|h, n| {
Expand All @@ -832,17 +958,7 @@ where
self.on_head_deactivated(deactivated)
}

self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
.await?;

// If there are no leaves being deactivated, we don't need to send an update.
//
// Our peers will be informed about our finalized block the next time we activating/deactivating some leaf.
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}

Ok(())
update
}

/// Handles a header activation. If the header's state doesn't support the parachains API,
Expand All @@ -861,7 +977,7 @@ where
gum::trace!(
target: LOG_TARGET,
relay_parent = ?hash,
"Leaf got activated, notifying exterinal listeners"
"Leaf got activated, notifying external listeners"
);
for listener in listeners {
// it's fine if the listener is no longer interested
Expand Down
3 changes: 2 additions & 1 deletion node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use {
polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames, request_response::ReqProtocolNames,
},
polkadot_overseer::BlockInfo,
polkadot_overseer::{BlockInfo, MajorSyncOracle},
sc_client_api::BlockBackend,
sp_core::traits::SpawnNamed,
sp_trie::PrefixedMemoryDB,
Expand Down Expand Up @@ -1091,6 +1091,7 @@ where
overseer_message_channel_capacity_override,
req_protocol_names,
peerset_protocol_names,
sync_oracle: MajorSyncOracle::new(Box::new(network.clone())),
},
)
.map_err(|e| {
Expand Down
8 changes: 6 additions & 2 deletions node/service/src/overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ pub use polkadot_overseer::{
HeadSupportsParachains,
};
use polkadot_overseer::{
metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MetricsTrait,
Overseer, OverseerConnector, OverseerHandle, SpawnGlue,
metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MajorSyncOracle,
MetricsTrait, Overseer, OverseerConnector, OverseerHandle, SpawnGlue,
};

use polkadot_primitives::runtime_api::ParachainHost;
Expand Down Expand Up @@ -125,6 +125,8 @@ where
pub req_protocol_names: ReqProtocolNames,
/// [`PeerSet`] protocol names to protocols mapping.
pub peerset_protocol_names: PeerSetProtocolNames,
/// SyncOracle is used to detect when initial full node sync is complete
pub sync_oracle: MajorSyncOracle,
}

/// Obtain a prepared `OverseerBuilder`, that is initialized
Expand Down Expand Up @@ -155,6 +157,7 @@ pub fn prepared_overseer_builder<Spawner, RuntimeClient>(
overseer_message_channel_capacity_override,
req_protocol_names,
peerset_protocol_names,
sync_oracle,
}: OverseerGenArgs<Spawner, RuntimeClient>,
) -> Result<
InitializedOverseerBuilder<
Expand Down Expand Up @@ -319,6 +322,7 @@ where
.supports_parachains(runtime_client)
.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
.metrics(metrics)
.sync_oracle(sync_oracle)
.spawner(spawner);

if let Some(capacity) = overseer_message_channel_capacity_override {
Expand Down