diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index c4f6256e0e4e..afe6ded73df7 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -356,16 +356,16 @@ impl MajorSyncOracle { } /// Check if node is in major sync - pub fn finished_syncing(&mut self) -> bool { + pub fn is_syncing(&mut self) -> bool { match &mut self.state { OracleState::Syncing(sync_oracle) => - if !sync_oracle.is_major_syncing() { - self.state = OracleState::Done; + if sync_oracle.is_major_syncing() { true } else { + self.state = OracleState::Done; false }, - OracleState::Done => true, + OracleState::Done => false, } } } @@ -765,32 +765,13 @@ where } async fn run_inner(mut self) -> SubsystemResult<()> { - enum MainLoopState { - Syncing, - Running, - } - let mut main_loop_state = MainLoopState::Syncing; - let metrics = self.metrics.clone(); spawn_metronome_metrics(&mut self, metrics)?; - let initial_sync_finished = self.sync_oracle.finished_syncing(); - // Import the active leaves found in the database + // Import the active leaves found in the database but don't broadcast ActiveLeaves for them for (hash, number) in std::mem::take(&mut self.leaves) { let _ = self.active_leaves.insert(hash, number); - if let Some((span, status)) = self.on_head_activated(&hash, None).await { - if initial_sync_finished { - // Initial sync is complete. Notify the subsystems and proceed to the main loop - let update = ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash, - number, - status, - span, - }); - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - main_loop_state = MainLoopState::Running; - } - } + self.on_head_activated(&hash, None).await; } // main loop @@ -800,66 +781,28 @@ where match msg { Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?, Event::Stop => return self.handle_stop().await, + Event::BlockImported(block) if self.sync_oracle.is_syncing() => { + // If we are syncing - don't broadcast ActiveLeaves + self.block_imported(block).await; + }, Event::BlockImported(block) => { - match main_loop_state { - MainLoopState::Syncing => { - self.block_imported(block).await; - }, - MainLoopState::Running => { - let update = self.block_imported(block).await; - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - } + // Syncing is complete - broadcast ActiveLeaves + let update = self.block_imported(block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; } - + }, + Event::BlockFinalized(block) if self.sync_oracle.is_syncing() => { + // Still syncing - just import the block and don't broadcast anything + self.block_finalized(&block).await; }, Event::BlockFinalized(block) => { - match main_loop_state { - MainLoopState::Syncing => { - if self.sync_oracle.finished_syncing() { - // Initial major sync is complete so an `ActiveLeaves` needs to be broadcasted. Most of the - // subsystems start doing work when they receive their first `ActiveLeaves` event. We need - // to ensure such event is sent no matter what. - // We can also wait for the next leaf to be activated which is safe in probably 99% of the - // cases. However if the finality is stalled for some reason and a new node is started or - // restarted its subsystems won't start without the logic here. - // - // To force an `ActiveLeaves` event first we do an artificial `block_import`. - let update = self.block_imported(block.clone()).await; - if !update.is_empty() { - // it might yield an `ActiveLeaves` update. If so - send it. - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } else { - // if not - prepare one manually. All the required state should be already available. - let update = self.prepare_initial_active_leaves(&block); - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - - - // Now process finalized event. It will probably yield an `ActiveLeaves` which will - // deactivate the heads activated by the previous event in this scope, so broadcast it. - let update = self.block_finalized(&block).await; - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - - // And finally broadcast `BlockFinalized` - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; - main_loop_state = MainLoopState::Running; - } else { - self.block_finalized(&block).await; - } - }, - MainLoopState::Running => { - let update = self.block_finalized(&block).await; - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; - } + let update = self.block_finalized(&block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; } - }, + self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; + } Event::ExternalRequest(request) => self.handle_external_request(request) } }, @@ -869,32 +812,6 @@ where } } - fn prepare_initial_active_leaves(&self, block: &BlockInfo) -> ActiveLeavesUpdate { - // In theory we can receive `BlockImported` during initial major sync. In this case the - // update will be empty. - let span = match self.span_per_active_leaf.get(&block.hash) { - Some(span) => span.clone(), - None => { - // This should never happen. - gum::warn!( - target: LOG_TARGET, - ?block.hash, - ?block.number, - "Span for initial active leaf not found. This is not expected" - ); - let span = Arc::new(jaeger::Span::new(block.hash, "leaf-activated")); - span - }, - }; - - ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: block.hash, - number: block.number, - status: LeafStatus::Fresh, - span, - }) - } - async fn handle_stop(self) -> Result<(), SubsystemError> { self.stop().await; Ok(()) diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index 59d38cf555f0..3f5edc14ac13 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -447,12 +447,6 @@ fn overseer_start_stop_works() { handle.block_imported(third_block).await; let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: first_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated: Some(ActivatedLeaf { hash: second_block_hash, @@ -552,18 +546,6 @@ fn overseer_finalize_works() { handle.block_finalized(third_block).await; let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: first_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: second_block_hash, - number: 2, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { deactivated: [first_block_hash, second_block_hash].as_ref().into(), ..Default::default() @@ -655,18 +637,6 @@ fn overseer_finalize_leaf_preserves_it() { handle.block_finalized(first_block).await; let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: first_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: second_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { deactivated: [second_block_hash].as_ref().into(), ..Default::default() @@ -754,12 +724,6 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { span: Arc::new(jaeger::Span::Disabled), status: LeafStatus::Fresh, })), - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: finalized_block.hash, - number: finalized_block.number, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), OverseerSignal::BlockFinalized(finalized_block.hash, 1), ]; @@ -892,85 +856,6 @@ fn do_not_send_events_before_initial_major_sync_is_complete() { }); } -#[test] -fn active_leaves_is_sent_on_stalled_finality() { - let spawner = sp_core::testing::TaskExecutor::new(); - let (tx_5, mut rx_5) = metered::channel(64); - let is_syncing = std::sync::Arc::new(std::sync::Mutex::new(true)); - - // Prepare overseer - let (overseer, handle) = dummy_overseer_builder( - spawner, - MockSupportsParachains, - None, - MajorSyncOracle::new(Box::new(PuppetOracle { state: is_syncing.clone() })), - Vec::new(), - ) - .unwrap() - .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) - .build() - .unwrap(); - - // Prepare overseer future - let mut handle = Handle::new(handle); - let overseer_fut = overseer.run_inner().fuse(); - pin_mut!(overseer_fut); - - // The test logic. It will run concurrently with the overseer. - // The test simulates a node (re)started during a finality stall. Overseer starts, receives one - // `block_finalized` event. This is equivalent to starting the node and syncing state. However - // after that no `block_imported` events are received. Despite that overseer should generate one - // `ActiveLeaves` event for the last finalized block and broadcast it to the subsystems - let test = async move { - let finalized_block_after_sync = - BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 1 }; - - let mut ss5_results = Vec::new(); - - // Switch the oracle state - initial sync is completed - *is_syncing.lock().unwrap() = false; - - // Generate 'block imported' event which simulates end of sync - handle.block_finalized(finalized_block_after_sync.clone()).await; - - let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: finalized_block_after_sync.hash, - number: finalized_block_after_sync.number, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), - OverseerSignal::BlockFinalized( - finalized_block_after_sync.hash, - finalized_block_after_sync.number, - ), - ]; - - loop { - let res = rx_5.next().timeout(Duration::from_millis(100)).await; - assert_matches!(res, Some(res) => { - if let Some(res) = dbg!(res) { - ss5_results.push(res); - } - }); - - if ss5_results.len() == expected_heartbeats.len() { - handle.stop().await; - break - } - } - - assert_eq!(ss5_results.len(), expected_heartbeats.len()); - - for expected in expected_heartbeats { - assert!(ss5_results.contains(&expected)); - } - }; - - let (res, _) = executor::block_on(futures::future::join(overseer_fut, test)); - assert_matches!(res, Ok(())); -} - #[test] fn initial_leaves_are_sent_if_nothing_to_sync() { let spawner = sp_core::testing::TaskExecutor::new(); @@ -999,16 +884,39 @@ fn initial_leaves_are_sent_if_nothing_to_sync() { let overseer_fut = overseer.run_inner().fuse(); pin_mut!(overseer_fut); + // Run overseer for 500ms to check if any events are generated + loop { + select! { + _ = overseer_fut => { + assert!(false, "Overseer should not exit"); + }, + res = rx_5.next().timeout(Duration::from_millis(500)).fuse() => { + assert_matches!(res, None); + break; + } + } + } + + // Now complete sync and generate an event just to be sure everything works + let imported_block_after_sync = + BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 2 }; + *is_syncing.lock().unwrap() = false; + + // Finalize one of the leaves from db and import one new + handle.block_finalized(db_leaves.clone()).await; + handle.block_imported(imported_block_after_sync.clone()).await; + let mut ss5_results = Vec::new(); - // Generate two more events - these must not be ignore - let expected_heartbeats = - vec![OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: db_leaves.hash, - number: db_leaves.number, + let expected_heartbeats = vec![ + OverseerSignal::BlockFinalized(db_leaves.hash, db_leaves.number), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: imported_block_after_sync.hash, + number: imported_block_after_sync.number, span: Arc::new(jaeger::Span::Disabled), status: LeafStatus::Fresh, - }))]; + })), + ]; loop { select! { @@ -1267,7 +1175,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() { // send a signal to each subsystem handle - .block_finalized(BlockInfo { + .block_imported(BlockInfo { hash: Default::default(), parent_hash: Default::default(), number: Default::default(), @@ -1351,7 +1259,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() { let res = overseer_fut.await; assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); - assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS * 2); + assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS_MESSAGED); assert!(res.is_ok());