Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Don't send ActiveLeaves on stratup; do it when the first fresh leaf…
Browse files Browse the repository at this point in the history
… is imported
  • Loading branch information
tdimitrov committed Feb 15, 2023
1 parent ba1167b commit 809a612
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 229 deletions.
129 changes: 23 additions & 106 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,16 +356,16 @@ impl MajorSyncOracle {
}

/// Check if node is in major sync
pub fn finished_syncing(&mut self) -> bool {
pub fn is_syncing(&mut self) -> bool {
match &mut self.state {
OracleState::Syncing(sync_oracle) =>
if !sync_oracle.is_major_syncing() {
self.state = OracleState::Done;
if sync_oracle.is_major_syncing() {
true
} else {
self.state = OracleState::Done;
false
},
OracleState::Done => true,
OracleState::Done => false,
}
}
}
Expand Down Expand Up @@ -765,32 +765,13 @@ where
}

async fn run_inner(mut self) -> SubsystemResult<()> {
enum MainLoopState {
Syncing,
Running,
}
let mut main_loop_state = MainLoopState::Syncing;

let metrics = self.metrics.clone();
spawn_metronome_metrics(&mut self, metrics)?;

let initial_sync_finished = self.sync_oracle.finished_syncing();
// Import the active leaves found in the database
// Import the active leaves found in the database but don't broadcast ActiveLeaves for them
for (hash, number) in std::mem::take(&mut self.leaves) {
let _ = self.active_leaves.insert(hash, number);
if let Some((span, status)) = self.on_head_activated(&hash, None).await {
if initial_sync_finished {
// Initial sync is complete. Notify the subsystems and proceed to the main loop
let update = ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash,
number,
status,
span,
});
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
main_loop_state = MainLoopState::Running;
}
}
self.on_head_activated(&hash, None).await;
}

// main loop
Expand All @@ -800,66 +781,28 @@ where
match msg {
Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?,
Event::Stop => return self.handle_stop().await,
Event::BlockImported(block) if self.sync_oracle.is_syncing() => {
// If we are syncing - don't broadcast ActiveLeaves
self.block_imported(block).await;
},
Event::BlockImported(block) => {
match main_loop_state {
MainLoopState::Syncing => {
self.block_imported(block).await;
},
MainLoopState::Running => {
let update = self.block_imported(block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
}
// Syncing is complete - broadcast ActiveLeaves
let update = self.block_imported(block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}

},
Event::BlockFinalized(block) if self.sync_oracle.is_syncing() => {
// Still syncing - just import the block and don't broadcast anything
self.block_finalized(&block).await;
},
Event::BlockFinalized(block) => {
match main_loop_state {
MainLoopState::Syncing => {
if self.sync_oracle.finished_syncing() {
// Initial major sync is complete so an `ActiveLeaves` needs to be broadcasted. Most of the
// subsystems start doing work when they receive their first `ActiveLeaves` event. We need
// to ensure such event is sent no matter what.
// We can also wait for the next leaf to be activated which is safe in probably 99% of the
// cases. However if the finality is stalled for some reason and a new node is started or
// restarted its subsystems won't start without the logic here.
//
// To force an `ActiveLeaves` event first we do an artificial `block_import`.
let update = self.block_imported(block.clone()).await;
if !update.is_empty() {
// it might yield an `ActiveLeaves` update. If so - send it.
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
} else {
// if not - prepare one manually. All the required state should be already available.
let update = self.prepare_initial_active_leaves(&block);
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}


// Now process finalized event. It will probably yield an `ActiveLeaves` which will
// deactivate the heads activated by the previous event in this scope, so broadcast it.
let update = self.block_finalized(&block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}

// And finally broadcast `BlockFinalized`
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
main_loop_state = MainLoopState::Running;
} else {
self.block_finalized(&block).await;
}
},
MainLoopState::Running => {
let update = self.block_finalized(&block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
}
let update = self.block_finalized(&block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
},
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
}
Event::ExternalRequest(request) => self.handle_external_request(request)
}
},
Expand All @@ -869,32 +812,6 @@ where
}
}

fn prepare_initial_active_leaves(&self, block: &BlockInfo) -> ActiveLeavesUpdate {
// In theory we can receive `BlockImported` during initial major sync. In this case the
// update will be empty.
let span = match self.span_per_active_leaf.get(&block.hash) {
Some(span) => span.clone(),
None => {
// This should never happen.
gum::warn!(
target: LOG_TARGET,
?block.hash,
?block.number,
"Span for initial active leaf not found. This is not expected"
);
let span = Arc::new(jaeger::Span::new(block.hash, "leaf-activated"));
span
},
};

ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: block.hash,
number: block.number,
status: LeafStatus::Fresh,
span,
})
}

async fn handle_stop(self) -> Result<(), SubsystemError> {
self.stop().await;
Ok(())
Expand Down
154 changes: 31 additions & 123 deletions node/overseer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,12 +447,6 @@ fn overseer_start_stop_works() {
handle.block_imported(third_block).await;

let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: second_block_hash,
Expand Down Expand Up @@ -552,18 +546,6 @@ fn overseer_finalize_works() {
handle.block_finalized(third_block).await;

let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: second_block_hash,
number: 2,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
deactivated: [first_block_hash, second_block_hash].as_ref().into(),
..Default::default()
Expand Down Expand Up @@ -655,18 +637,6 @@ fn overseer_finalize_leaf_preserves_it() {
handle.block_finalized(first_block).await;

let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: second_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
deactivated: [second_block_hash].as_ref().into(),
..Default::default()
Expand Down Expand Up @@ -754,12 +724,6 @@ fn do_not_send_empty_leaves_update_on_block_finalization() {
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: finalized_block.hash,
number: finalized_block.number,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::BlockFinalized(finalized_block.hash, 1),
];

Expand Down Expand Up @@ -892,85 +856,6 @@ fn do_not_send_events_before_initial_major_sync_is_complete() {
});
}

#[test]
fn active_leaves_is_sent_on_stalled_finality() {
let spawner = sp_core::testing::TaskExecutor::new();
let (tx_5, mut rx_5) = metered::channel(64);
let is_syncing = std::sync::Arc::new(std::sync::Mutex::new(true));

// Prepare overseer
let (overseer, handle) = dummy_overseer_builder(
spawner,
MockSupportsParachains,
None,
MajorSyncOracle::new(Box::new(PuppetOracle { state: is_syncing.clone() })),
Vec::new(),
)
.unwrap()
.replace_candidate_backing(move |_| TestSubsystem6(tx_5))
.build()
.unwrap();

// Prepare overseer future
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run_inner().fuse();
pin_mut!(overseer_fut);

// The test logic. It will run concurrently with the overseer.
// The test simulates a node (re)started during a finality stall. Overseer starts, receives one
// `block_finalized` event. This is equivalent to starting the node and syncing state. However
// after that no `block_imported` events are received. Despite that overseer should generate one
// `ActiveLeaves` event for the last finalized block and broadcast it to the subsystems
let test = async move {
let finalized_block_after_sync =
BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 1 };

let mut ss5_results = Vec::new();

// Switch the oracle state - initial sync is completed
*is_syncing.lock().unwrap() = false;

// Generate 'block imported' event which simulates end of sync
handle.block_finalized(finalized_block_after_sync.clone()).await;

let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: finalized_block_after_sync.hash,
number: finalized_block_after_sync.number,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::BlockFinalized(
finalized_block_after_sync.hash,
finalized_block_after_sync.number,
),
];

loop {
let res = rx_5.next().timeout(Duration::from_millis(100)).await;
assert_matches!(res, Some(res) => {
if let Some(res) = dbg!(res) {
ss5_results.push(res);
}
});

if ss5_results.len() == expected_heartbeats.len() {
handle.stop().await;
break
}
}

assert_eq!(ss5_results.len(), expected_heartbeats.len());

for expected in expected_heartbeats {
assert!(ss5_results.contains(&expected));
}
};

let (res, _) = executor::block_on(futures::future::join(overseer_fut, test));
assert_matches!(res, Ok(()));
}

#[test]
fn initial_leaves_are_sent_if_nothing_to_sync() {
let spawner = sp_core::testing::TaskExecutor::new();
Expand Down Expand Up @@ -999,16 +884,39 @@ fn initial_leaves_are_sent_if_nothing_to_sync() {
let overseer_fut = overseer.run_inner().fuse();
pin_mut!(overseer_fut);

// Run overseer for 500ms to check if any events are generated
loop {
select! {
_ = overseer_fut => {
assert!(false, "Overseer should not exit");
},
res = rx_5.next().timeout(Duration::from_millis(500)).fuse() => {
assert_matches!(res, None);
break;
}
}
}

// Now complete sync and generate an event just to be sure everything works
let imported_block_after_sync =
BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 2 };
*is_syncing.lock().unwrap() = false;

// Finalize one of the leaves from db and import one new
handle.block_finalized(db_leaves.clone()).await;
handle.block_imported(imported_block_after_sync.clone()).await;

let mut ss5_results = Vec::new();

// Generate two more events - these must not be ignore
let expected_heartbeats =
vec![OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: db_leaves.hash,
number: db_leaves.number,
let expected_heartbeats = vec![
OverseerSignal::BlockFinalized(db_leaves.hash, db_leaves.number),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: imported_block_after_sync.hash,
number: imported_block_after_sync.number,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
}))];
})),
];

loop {
select! {
Expand Down Expand Up @@ -1267,7 +1175,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() {

// send a signal to each subsystem
handle
.block_finalized(BlockInfo {
.block_imported(BlockInfo {
hash: Default::default(),
parent_hash: Default::default(),
number: Default::default(),
Expand Down Expand Up @@ -1351,7 +1259,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() {

let res = overseer_fut.await;
assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS * 2);
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS_MESSAGED);

assert!(res.is_ok());
Expand Down

0 comments on commit 809a612

Please sign in to comment.