From 0e7dad5c0e676de9e0e6e0b2f38a46d8f7682e66 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 10 Jan 2023 17:06:55 -0500 Subject: [PATCH 1/3] pvf: Add checks for result sender when retrying preparation in tests --- node/core/pvf/src/host.rs | 78 ++++++++++++++++++++++++++++++++------- 1 file changed, 65 insertions(+), 13 deletions(-) diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index b41ebd3c425b..ba59ab69873b 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -931,6 +931,13 @@ mod tests { ValidationHost { to_host_tx } } + async fn poll_and_recv_result(&mut self, result_rx: oneshot::Receiver) -> T + where + T: Send, + { + run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await + } + async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue { let to_prepare_queue_rx = &mut self.to_prepare_queue_rx; run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed()) @@ -991,7 +998,7 @@ mod tests { futures::select! { _ = Delay::new(Duration::from_millis(500)).fuse() => (), msg = to_sweeper_rx.next().fuse() => { - panic!("the sweeper supposed to be empty, but received: {:?}", msg) + panic!("the sweeper is supposed to be empty, but received: {:?}", msg) } } } @@ -1311,12 +1318,12 @@ mod tests { // Test that multiple prechecking requests do not trigger preparation retries if the first one // failed. #[tokio::test] - async fn test_precheck_prepare_retry() { + async fn test_precheck_prepare_no_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); // Submit a precheck request that fails. - let (result_tx, _result_rx) = oneshot::channel(); + let (result_tx, result_rx) = oneshot::channel(); host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap(); // The queue received the prepare request. @@ -1333,22 +1340,34 @@ mod tests { .await .unwrap(); + // The result should contain the error. + let result = test.poll_and_recv_result(result_rx).await; + assert_matches!(result, Err(PrepareError::TimedOut)); + // Submit another precheck request. - let (result_tx_2, _result_rx_2) = oneshot::channel(); + let (result_tx_2, result_rx_2) = oneshot::channel(); host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap(); // Assert the prepare queue is empty. test.poll_ensure_to_prepare_queue_is_empty().await; + // The result should contain the original error. + let result = test.poll_and_recv_result(result_rx_2).await; + assert_matches!(result, Err(PrepareError::TimedOut)); + // Pause for enough time to reset the cooldown for this failed prepare request. futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another precheck request. - let (result_tx_3, _result_rx_3) = oneshot::channel(); + let (result_tx_3, result_rx_3) = oneshot::channel(); host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap(); // Assert the prepare queue is empty - we do not retry for precheck requests. test.poll_ensure_to_prepare_queue_is_empty().await; + + // The result should still contain the original error. + let result = test.poll_and_recv_result(result_rx_3).await; + assert_matches!(result, Err(PrepareError::TimedOut)); } // Test that multiple execution requests trigger preparation retries if the first one failed due @@ -1359,7 +1378,7 @@ mod tests { let mut host = test.host_handle(); // Submit a execute request that fails. - let (result_tx, _result_rx) = oneshot::channel(); + let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1384,8 +1403,12 @@ mod tests { .await .unwrap(); - // Submit another execute request. - let (result_tx_2, _result_rx_2) = oneshot::channel(); + // The result should contain the error. + let result = test.poll_and_recv_result(result_rx).await; + assert_matches!(result, Err(ValidationError::InternalError(_))); + + // Submit another execute request. We shouldn't try to prepare again, yet. + let (result_tx_2, result_rx_2) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1399,11 +1422,15 @@ mod tests { // Assert the prepare queue is empty. test.poll_ensure_to_prepare_queue_is_empty().await; + // The result should contain the original error. + let result = test.poll_and_recv_result(result_rx_2).await; + assert_matches!(result, Err(ValidationError::InternalError(_))); + // Pause for enough time to reset the cooldown for this failed prepare request. futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another execute request. - let (result_tx_3, _result_rx_3) = oneshot::channel(); + let (result_tx_3, result_rx_3) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1419,6 +1446,10 @@ mod tests { test.poll_and_recv_to_prepare_queue().await, prepare::ToQueue::Enqueue { .. } ); + + // Preparation should have been retried and succeeded this time. + let result = test.poll_and_recv_result(result_rx_3).await; + assert_matches!(result, Err(ValidationError::InternalError(_))); } // Test that multiple execution requests don't trigger preparation retries if the first one @@ -1428,8 +1459,8 @@ mod tests { let mut test = Builder::default().build(); let mut host = test.host_handle(); - // Submit a execute request that fails. - let (result_tx, _result_rx) = oneshot::channel(); + // Submit an execute request that fails. + let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1454,8 +1485,15 @@ mod tests { .await .unwrap(); + // The result should contain the error. + let result = test.poll_and_recv_result(result_rx).await; + assert_matches!( + result, + Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_))) + ); + // Submit another execute request. - let (result_tx_2, _result_rx_2) = oneshot::channel(); + let (result_tx_2, result_rx_2) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1469,11 +1507,18 @@ mod tests { // Assert the prepare queue is empty. test.poll_ensure_to_prepare_queue_is_empty().await; + // The result should contain the original error. + let result = test.poll_and_recv_result(result_rx_2).await; + assert_matches!( + result, + Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_))) + ); + // Pause for enough time to reset the cooldown for this failed prepare request. futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another execute request. - let (result_tx_3, _result_rx_3) = oneshot::channel(); + let (result_tx_3, result_rx_3) = oneshot::channel(); host.execute_pvf( Pvf::from_discriminator(1), TEST_EXECUTION_TIMEOUT, @@ -1486,6 +1531,13 @@ mod tests { // Assert the prepare queue is empty - we do not retry for prevalidation errors. test.poll_ensure_to_prepare_queue_is_empty().await; + + // The result should still contain the original error. + let result = test.poll_and_recv_result(result_rx_3).await; + assert_matches!( + result, + Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_))) + ); } // Test that multiple heads-up requests trigger preparation retries if the first one failed. From 5cdda3b0f7a22c9821630c087802bfb1b931fad1 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 10 Jan 2023 17:19:42 -0500 Subject: [PATCH 2/3] pvf: Fix missing execution request when retrying preparation --- node/core/pvf/src/host.rs | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index ba59ab69873b..02d7717d0d11 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -550,6 +550,10 @@ async fn handle_execute_pvf( }, ) .await?; + + // Add an execution request that will wait to run after this prepare job has + // finished. + awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); } else { let _ = result_tx.send(Err(ValidationError::from(error.clone()))); } @@ -1447,9 +1451,28 @@ mod tests { prepare::ToQueue::Enqueue { .. } ); + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Ok(Duration::default()), + }) + .await + .unwrap(); + // Preparation should have been retried and succeeded this time. - let result = test.poll_and_recv_result(result_rx_3).await; - assert_matches!(result, Err(ValidationError::InternalError(_))); + let result_tx_3 = assert_matches!( + test.poll_and_recv_to_execute_queue().await, + execute::ToQueue::Enqueue { result_tx, .. } => result_tx + ); + + // Send an error for the execution here, just so the result receiver gets something. + result_tx_3 + .send(Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))) + .unwrap(); + assert_matches!( + result_rx_3.now_or_never().unwrap().unwrap(), + Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)) + ); } // Test that multiple execution requests don't trigger preparation retries if the first one From 3f1d5b35db265583a24e0508e2cf5b7ecca514b6 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Wed, 11 Jan 2023 14:19:40 -0500 Subject: [PATCH 3/3] Update comment --- node/core/pvf/src/host.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 02d7717d0d11..6b51b8cc1351 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -1465,7 +1465,8 @@ mod tests { execute::ToQueue::Enqueue { result_tx, .. } => result_tx ); - // Send an error for the execution here, just so the result receiver gets something. + // Send an error for the execution here, just so we can check the result receiver is still + // alive. result_tx_3 .send(Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))) .unwrap();