Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

pvf: Fix missing execution request when retrying preparation #6537

Merged
merged 3 commits into from
Jan 11, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 88 additions & 13 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,10 @@ async fn handle_execute_pvf(
},
)
.await?;

// Add an execution request that will wait to run after this prepare job has
// finished.
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
} else {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
}
Expand Down Expand Up @@ -931,6 +935,13 @@ mod tests {
ValidationHost { to_host_tx }
}

async fn poll_and_recv_result<T>(&mut self, result_rx: oneshot::Receiver<T>) -> T
where
T: Send,
{
run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await
}

async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue {
let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed())
Expand Down Expand Up @@ -991,7 +1002,7 @@ mod tests {
futures::select! {
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
msg = to_sweeper_rx.next().fuse() => {
panic!("the sweeper supposed to be empty, but received: {:?}", msg)
panic!("the sweeper is supposed to be empty, but received: {:?}", msg)
}
}
}
Expand Down Expand Up @@ -1311,12 +1322,12 @@ mod tests {
// Test that multiple prechecking requests do not trigger preparation retries if the first one
// failed.
#[tokio::test]
async fn test_precheck_prepare_retry() {
async fn test_precheck_prepare_no_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();

// Submit a precheck request that fails.
let (result_tx, _result_rx) = oneshot::channel();
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();

// The queue received the prepare request.
Expand All @@ -1333,22 +1344,34 @@ mod tests {
.await
.unwrap();

// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(result, Err(PrepareError::TimedOut));

// Submit another precheck request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
let (result_tx_2, result_rx_2) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap();

// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(result, Err(PrepareError::TimedOut));

// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;

// Submit another precheck request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap();

// Assert the prepare queue is empty - we do not retry for precheck requests.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should still contain the original error.
let result = test.poll_and_recv_result(result_rx_3).await;
assert_matches!(result, Err(PrepareError::TimedOut));
}

// Test that multiple execution requests trigger preparation retries if the first one failed due
Expand All @@ -1359,7 +1382,7 @@ mod tests {
let mut host = test.host_handle();

// Submit a execute request that fails.
let (result_tx, _result_rx) = oneshot::channel();
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1384,8 +1407,12 @@ mod tests {
.await
.unwrap();

// Submit another execute request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(result, Err(ValidationError::InternalError(_)));

// Submit another execute request. We shouldn't try to prepare again, yet.
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1399,11 +1426,15 @@ mod tests {
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(result, Err(ValidationError::InternalError(_)));

// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;

// Submit another execute request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1419,6 +1450,29 @@ mod tests {
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);

test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
})
.await
.unwrap();

// Preparation should have been retried and succeeded this time.
let result_tx_3 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
);

// Send an error for the execution here, just so the result receiver gets something.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first look I've had at the PVF code. Cool stuff!

Out of curiosity, why is it important that the result receiver gets something here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! This seemed like an easy, bite-sized intro to the module. 🙂

Out of curiosity, why is it important that the result receiver gets something here?

I wanted to test that the receiver was still operational, i.e. not dropped. I guess alternatively I could poll it and it should return Ok(None), but it felt more complete to send something. And an Ok value was more complicated to send as it would require sending a worker. Does that answer your question? Let me know if I can clarify this, so it's more apparent to readers of the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is totally optional, but modifying the comment to something like "just so we can check the result receiver is still alive" would've been more clear to me.

result_tx_3
.send(Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)))
.unwrap();
assert_matches!(
result_rx_3.now_or_never().unwrap().unwrap(),
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
);
}

// Test that multiple execution requests don't trigger preparation retries if the first one
Expand All @@ -1428,8 +1482,8 @@ mod tests {
let mut test = Builder::default().build();
let mut host = test.host_handle();

// Submit a execute request that fails.
let (result_tx, _result_rx) = oneshot::channel();
// Submit an execute request that fails.
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1454,8 +1508,15 @@ mod tests {
.await
.unwrap();

// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);

// Submit another execute request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1469,11 +1530,18 @@ mod tests {
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);

// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;

// Submit another execute request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1486,6 +1554,13 @@ mod tests {

// Assert the prepare queue is empty - we do not retry for prevalidation errors.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should still contain the original error.
let result = test.poll_and_recv_result(result_rx_3).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);
}

// Test that multiple heads-up requests trigger preparation retries if the first one failed.
Expand Down