Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
sc-consensus-beefy: fix on-demand async state machine (#13599)
Browse files Browse the repository at this point in the history
`futures_util::pending!()` macro only yields to the event loop once,
resulting in many calls to the `OnDemandJustificationsEngine::next()`
made by the tokio reactor even though the on-demand-engine is idle.

Replace it with the correct `futures::future::pending::<()>()` function
which forever yields to the event loop, which is what we want when
the engine is idle.

Signed-off-by: Adrian Catangiu <adrian@parity.io>
  • Loading branch information
acatangiu authored Mar 14, 2023
1 parent 897b95d commit 39510ec
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
let (peer, req_info, resp) = match &mut self.state {
State::Idle => {
futures::pending!();
futures::future::pending::<()>().await;
return None
},
State::AwaitingResponse(peer, req_info, receiver) => {
Expand Down
23 changes: 15 additions & 8 deletions client/consensus/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ async fn on_demand_beefy_justification_sync() {
let dave_index = 3;

// push 30 blocks
let hashes = net.generate_blocks_and_sync(35, session_len, &validator_set, false).await;
let mut hashes = net.generate_blocks_and_sync(30, session_len, &validator_set, false).await;

let fast_peers = fast_peers.into_iter().enumerate();
let net = Arc::new(Mutex::new(net));
Expand All @@ -951,8 +951,16 @@ async fn on_demand_beefy_justification_sync() {
// Spawn Dave, they are now way behind voting and can only catch up through on-demand justif
// sync.
tokio::spawn(dave_task);
// give Dave a chance to spawn and init.
run_for(Duration::from_millis(400), &net).await;
// Dave pushes and syncs 4 more blocks just to make sure he gets included in gossip.
{
let mut net_guard = net.lock();
let built_hashes =
net_guard
.peer(dave_index)
.generate_blocks(4, BlockOrigin::File, |builder| builder.build().unwrap().block);
hashes.extend(built_hashes);
net_guard.run_until_sync().await;
}

let (dave_best_blocks, _) =
get_beefy_streams(&mut net.lock(), [(dave_index, BeefyKeyring::Dave)].into_iter());
Expand All @@ -965,7 +973,10 @@ async fn on_demand_beefy_justification_sync() {
// Have the other peers do some gossip so Dave finds out about their progress.
finalize_block_and_wait_for_beefy(&net, fast_peers, &[hashes[25], hashes[29]], &[25, 29]).await;

// Now verify Dave successfully finalized #1 (through on-demand justification request).
// Kick Dave's async loop by finalizing another block.
client.finalize_block(hashes[2], None).unwrap();

// And verify Dave successfully finalized #1 (through on-demand justification request).
wait_for_best_beefy_blocks(dave_best_blocks, &net, &[1]).await;

// Give all tasks some cpu cycles to burn through their events queues,
Expand All @@ -978,10 +989,6 @@ async fn on_demand_beefy_justification_sync() {
&[5, 10, 15, 20, 25],
)
.await;

let all_peers = all_peers.into_iter().enumerate();
// Now that Dave has caught up, sanity check voting works for all of them.
finalize_block_and_wait_for_beefy(&net, all_peers, &[hashes[30], hashes[34]], &[30]).await;
}

#[tokio::test]
Expand Down
10 changes: 5 additions & 5 deletions client/consensus/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,11 +887,6 @@ where
// based on the new resulting 'state'.
futures::select_biased! {
// Use `select_biased!` to prioritize order below.
// Make sure to pump gossip engine.
_ = gossip_engine => {
error!(target: LOG_TARGET, "🥩 Gossip engine has terminated, closing worker.");
return;
},
// Process finality notifications first since these drive the voter.
notification = finality_notifications.next() => {
if let Some(notification) = notification {
Expand All @@ -901,6 +896,11 @@ where
return;
}
},
// Make sure to pump gossip engine.
_ = gossip_engine => {
error!(target: LOG_TARGET, "🥩 Gossip engine has terminated, closing worker.");
return;
},
// Process incoming justifications as these can make some in-flight votes obsolete.
justif = self.on_demand_justifications.next().fuse() => {
if let Some(justif) = justif {
Expand Down

0 comments on commit 39510ec

Please sign in to comment.