Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Block ActiveLeavesUpdate and BlockFinalized events in overseer until major sync is complete #6689

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ orchestra = "0.0.4"
gum = { package = "tracing-gum", path = "../gum" }
lru = "0.9"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
async-trait = "0.1.57"
tikv-jemalloc-ctl = "0.5.0"

Expand Down
22 changes: 14 additions & 8 deletions node/overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use polkadot_overseer::{
self as overseer,
dummy::dummy_overseer_builder,
gen::{FromOrchestra, SpawnedSubsystem},
HeadSupportsParachains, SubsystemError,
HeadSupportsParachains, MajorSyncOracle, SubsystemError,
};
use polkadot_primitives::{CandidateReceipt, Hash};

Expand Down Expand Up @@ -153,13 +153,19 @@ fn main() {
Delay::new(Duration::from_secs(1)).await;
});

let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None)
.unwrap()
.replace_candidate_validation(|_| Subsystem2)
.replace_candidate_backing(|orig| orig)
.replace_candidate_backing(|_orig| Subsystem1)
.build()
.unwrap();
let (overseer, _handle) = dummy_overseer_builder(
spawner,
AlwaysSupportsParachains,
None,
MajorSyncOracle::new_dummy(),
Vec::new(),
)
.unwrap()
.replace_candidate_validation(|_| Subsystem2)
.replace_candidate_backing(|orig| orig)
.replace_candidate_backing(|_orig| Subsystem1)
.build()
.unwrap();

let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;
Expand Down
22 changes: 17 additions & 5 deletions node/overseer/src/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{
prometheus::Registry, HeadSupportsParachains, InitializedOverseerBuilder, MetricsTrait,
Overseer, OverseerMetrics, OverseerSignal, OverseerSubsystemContext, SpawnGlue,
prometheus::Registry, HeadSupportsParachains, InitializedOverseerBuilder, MajorSyncOracle,
MetricsTrait, Overseer, OverseerMetrics, OverseerSignal, OverseerSubsystemContext, SpawnGlue,
KNOWN_LEAVES_CACHE_SIZE,
};
use lru::LruCache;
use orchestra::{FromOrchestra, SpawnedSubsystem, Subsystem, SubsystemContext};
use polkadot_node_subsystem_types::{errors::SubsystemError, messages::*};
use polkadot_node_subsystem_types::{errors::SubsystemError, messages::*, BlockNumber, Hash};
// Generated dummy messages
use crate::messages::*;

Expand Down Expand Up @@ -63,6 +63,8 @@ pub fn dummy_overseer_builder<Spawner, SupportsParachains>(
spawner: Spawner,
supports_parachains: SupportsParachains,
registry: Option<&Registry>,
sync_oracle: MajorSyncOracle,
initial_leaves: Vec<(Hash, BlockNumber)>,
) -> Result<
InitializedOverseerBuilder<
SpawnGlue<Spawner>,
Expand Down Expand Up @@ -96,7 +98,14 @@ where
SpawnGlue<Spawner>: orchestra::Spawner + 'static,
SupportsParachains: HeadSupportsParachains,
{
one_for_all_overseer_builder(spawner, supports_parachains, DummySubsystem, registry)
one_for_all_overseer_builder(
spawner,
supports_parachains,
DummySubsystem,
registry,
sync_oracle,
initial_leaves,
)
}

/// Create an overseer with all subsystem being `Sub`.
Expand All @@ -105,6 +114,8 @@ pub fn one_for_all_overseer_builder<Spawner, SupportsParachains, Sub>(
supports_parachains: SupportsParachains,
subsystem: Sub,
registry: Option<&Registry>,
sync_oracle: MajorSyncOracle,
initial_leaves: Vec<(Hash, BlockNumber)>,
) -> Result<
InitializedOverseerBuilder<
SpawnGlue<Spawner>,
Expand Down Expand Up @@ -190,9 +201,10 @@ where
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
.leaves(Default::default())
.leaves(initial_leaves)
.spawner(SpawnGlue(spawner))
.metrics(metrics)
.sync_oracle(sync_oracle)
.supports_parachains(supports_parachains);
Ok(builder)
}
100 changes: 71 additions & 29 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use polkadot_node_subsystem_types::messages::{
DisputeDistributionMessage, GossipSupportMessage, NetworkBridgeRxMessage,
NetworkBridgeTxMessage, ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
};
use sp_consensus::SyncOracle;

pub use polkadot_node_subsystem_types::{
errors::{SubsystemError, SubsystemResult},
Expand Down Expand Up @@ -330,6 +331,45 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut hand
}
}

/// Represents the internal state of `MajorSyncOracle`
enum OracleState {
Syncing(Box<dyn SyncOracle + Send>),
Done,
}
/// Used to detect if the node is in initial major sync.
/// It's worth mentioning that this is a one way check. Once the initial full sync is complete
/// `MajorSyncOracle` will never return false. The reason is that the struct is meant to be used
/// only during initialization.
pub struct MajorSyncOracle {
state: OracleState,
}

impl MajorSyncOracle {
/// Create `MajorSyncOracle` from `SyncOracle`
pub fn new(sync_oracle: Box<dyn SyncOracle + Send>) -> Self {
Self { state: OracleState::Syncing(sync_oracle) }
}

/// Create dummy `MajorSyncOracle` which always returns true for `finished_syncing`
pub fn new_dummy() -> Self {
Self { state: OracleState::Done }
}

/// Check if node is in major sync
pub fn is_syncing(&mut self) -> bool {
match &mut self.state {
OracleState::Syncing(sync_oracle) =>
if sync_oracle.is_major_syncing() {
true
} else {
self.state = OracleState::Done;
false
},
OracleState::Done => false,
}
}
}

/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
///
/// This returns the overseer along with an [`OverseerHandle`] which can
Expand Down Expand Up @@ -629,6 +669,9 @@ pub struct Overseer<SupportsParachains> {

/// Various Prometheus metrics.
pub metrics: OverseerMetrics,

/// SyncOracle is used to detect when initial full node sync is complete
pub sync_oracle: MajorSyncOracle,
}

/// Spawn the metrics metronome task.
Expand Down Expand Up @@ -725,14 +768,10 @@ where
let metrics = self.metrics.clone();
spawn_metronome_metrics(&mut self, metrics)?;

// Notify about active leaves on startup before starting the loop
// Import the active leaves found in the database but don't broadcast ActiveLeaves for them
for (hash, number) in std::mem::take(&mut self.leaves) {
let _ = self.active_leaves.insert(hash, number);
if let Some((span, status)) = self.on_head_activated(&hash, None).await {
let update =
ActiveLeavesUpdate::start_work(ActivatedLeaf { hash, number, status, span });
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
self.on_head_activated(&hash, None).await;
}

loop {
Expand All @@ -746,12 +785,28 @@ where
Event::Stop => {
self.stop().await;
return Ok(());
}
},
Event::BlockImported(block) if self.sync_oracle.is_syncing() => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we still need the oracle? If I understand correctly, we don't get any BlockImported events anyway during sync.

// If we are syncing - don't broadcast ActiveLeaves
self.block_imported(block).await;
},
Event::BlockImported(block) => {
self.block_imported(block).await?;
}
// Syncing is complete - broadcast ActiveLeaves
let update = self.block_imported(block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
},
Event::BlockFinalized(block) if self.sync_oracle.is_syncing() => {
// Still syncing - just import the block and don't broadcast anything
self.block_finalized(&block).await;
},
Event::BlockFinalized(block) => {
self.block_finalized(block).await?;
let update = self.block_finalized(&block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
}
Event::ExternalRequest(request) => {
self.handle_external_request(request);
Expand Down Expand Up @@ -781,12 +836,12 @@ where
}
}

async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
async fn block_imported(&mut self, block: BlockInfo) -> ActiveLeavesUpdate {
match self.active_leaves.entry(block.hash) {
hash_map::Entry::Vacant(entry) => entry.insert(block.number),
hash_map::Entry::Occupied(entry) => {
debug_assert_eq!(*entry.get(), block.number);
return Ok(())
return ActiveLeavesUpdate::default()
},
};

Expand All @@ -808,13 +863,10 @@ where

self.clean_up_external_listeners();

if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
Ok(())
update
}

async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
async fn block_finalized(&mut self, block: &BlockInfo) -> ActiveLeavesUpdate {
let mut update = ActiveLeavesUpdate::default();

self.active_leaves.retain(|h, n| {
Expand All @@ -832,17 +884,7 @@ where
self.on_head_deactivated(deactivated)
}

self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
.await?;

// If there are no leaves being deactivated, we don't need to send an update.
//
// Our peers will be informed about our finalized block the next time we activating/deactivating some leaf.
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}

Ok(())
update
}

/// Handles a header activation. If the header's state doesn't support the parachains API,
Expand All @@ -861,7 +903,7 @@ where
gum::trace!(
target: LOG_TARGET,
relay_parent = ?hash,
"Leaf got activated, notifying exterinal listeners"
"Leaf got activated, notifying external listeners"
);
for listener in listeners {
// it's fine if the listener is no longer interested
Expand Down
Loading