Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(scan): Improve gRPC method errors and add timeout to scan service to avoid hanging #8318

Merged
merged 14 commits into from
Mar 14, 2024
Merged
83 changes: 58 additions & 25 deletions zebra-grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};
use futures_util::future::TryFutureExt;
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tonic::{transport::Server, Request, Response, Status};
use tower::ServiceExt;
use tower::{timeout::error::Elapsed, ServiceExt};

use zebra_chain::{block::Height, transaction};
use zebra_node_services::scan_service::{
Expand Down Expand Up @@ -70,34 +70,57 @@ where
.into_iter()
.map(|KeyWithHeight { key, height }| (key, height))
.collect();

let register_keys_response_fut = self
.scan_service
.clone()
.oneshot(ScanServiceRequest::RegisterKeys(keys.clone()));
.ready()
.await
.map_err(|_| Status::unknown("service poll_ready() method returned an error"))?
.call(ScanServiceRequest::RegisterKeys(keys.clone()));

let keys: Vec<_> = keys.into_iter().map(|(key, _start_at)| key).collect();

let subscribe_results_response_fut =
self.scan_service
.clone()
.oneshot(ScanServiceRequest::SubscribeResults(
keys.iter().cloned().collect(),
));
let subscribe_results_response_fut = self
.scan_service
.clone()
.ready()
.await
.map_err(|_| Status::unknown("service poll_ready() method returned an error"))?
.call(ScanServiceRequest::SubscribeResults(
keys.iter().cloned().collect(),
));

let (register_keys_response, subscribe_results_response) =
tokio::join!(register_keys_response_fut, subscribe_results_response_fut);

let ScanServiceResponse::RegisteredKeys(_) = register_keys_response
.map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
else {
return Err(Status::unknown(
"scan service returned an unexpected response",
));
// Ignores errors from the register keys request, we expect there to be a timeout if the keys
// are already registered, or an empty response if no new keys could be parsed as Sapling efvks.
//
// This method will still return an error if every key in the `scan` request is invalid, since
// the SubscribeResults request will return an error once the `rsp_tx` is dropped in `ScanTask::process_messages`
// when it finds that none of the keys in the request are registered.
arya2 marked this conversation as resolved.
Show resolved Hide resolved
let register_keys_err = match register_keys_response {
Ok(ScanServiceResponse::RegisteredKeys(_)) => None,
Ok(response) => {
return Err(Status::internal(format!(
"unexpected response from scan service: {response:?}"
)))
}
Err(err) if err.downcast_ref::<Elapsed>().is_some() => {
return Err(Status::deadline_exceeded(
"scan service requests timed out, is Zebra synced past Sapling activation height?")
)
}
Err(err) => Some(err),
};

let ScanServiceResponse::SubscribeResults(mut results_receiver) =
subscribe_results_response
.map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
subscribe_results_response.map_err(|err| {
register_keys_err
.map(|err| Status::invalid_argument(err.to_string()))
.unwrap_or(Status::internal(err.to_string()))
})?
else {
return Err(Status::unknown(
"scan service returned an unexpected response",
Expand Down Expand Up @@ -179,7 +202,7 @@ where
.ready()
.and_then(|service| service.call(ScanServiceRequest::Info))
.await
.map_err(|_| Status::unknown("scan service was unavailable"))?
.map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
else {
return Err(Status::unknown(
"scan service returned an unexpected response",
Expand Down Expand Up @@ -217,20 +240,30 @@ where
return Err(Status::invalid_argument(msg));
}

let ScanServiceResponse::RegisteredKeys(keys) = self
match self
.scan_service
.clone()
.ready()
.and_then(|service| service.call(ScanServiceRequest::RegisterKeys(keys)))
.await
.map_err(|_| Status::unknown("scan service was unavailable"))?
else {
return Err(Status::unknown(
"scan service returned an unexpected response",
));
};
{
Ok(ScanServiceResponse::RegisteredKeys(keys)) => {
Ok(Response::new(RegisterKeysResponse { keys }))
}

Ok(response) => {
return Err(Status::internal(format!(
"unexpected response from scan service: {response:?}"
)))
}

Err(err) if err.downcast_ref::<Elapsed>().is_some() => Err(Status::deadline_exceeded(
"RegisterKeys scan service request timed out, \
is Zebra synced past Sapling activation height?",
)),

Ok(Response::new(RegisterKeysResponse { keys }))
Err(err) => Err(Status::unknown(err.to_string())),
}
}

async fn clear_results(
Expand Down
3 changes: 0 additions & 3 deletions zebra-node-services/src/scan_service/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ pub enum Request {
/// Requests general info about the scanner
Info,

/// TODO: Accept `KeyHash`es and return key hashes that are registered
CheckKeyHashes(Vec<()>),

/// Submits viewing keys with their optional birth-heights for scanning.
RegisterKeys(Vec<(String, Option<u32>)>),

Expand Down
6 changes: 5 additions & 1 deletion zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Initializing the scanner and gRPC server.

use std::net::SocketAddr;
use std::{net::SocketAddr, time::Duration};

use color_eyre::Report;
use tokio::task::JoinHandle;
Expand All @@ -12,6 +12,9 @@ use zebra_state::ChainTipChange;

use crate::{scan, service::ScanService, storage::Storage, Config};

/// The timeout applied to scan service calls
pub const SCAN_SERVICE_TIMEOUT: Duration = Duration::from_secs(30);

/// Initialize [`ScanService`] based on its config.
///
/// TODO: add a test for this function.
Expand All @@ -25,6 +28,7 @@ pub async fn init_with_server(
info!(?config, "starting scan service");
let scan_service = ServiceBuilder::new()
.buffer(10)
.timeout(SCAN_SERVICE_TIMEOUT)
.service(ScanService::new(&config, network, state, chain_tip_change).await);

// TODO: move this to zebra-grpc init() function and include addr
Expand Down
29 changes: 16 additions & 13 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{collections::BTreeMap, future::Future, pin::Pin, task::Poll, time::Duration};

use futures::future::FutureExt;
use tower::Service;
use tower::{BoxError, Service};

use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Hash};

Expand Down Expand Up @@ -32,6 +32,9 @@ pub struct ScanService {
}

/// A timeout applied to `DeleteKeys` requests.
///
/// This should be shorter than [`SCAN_SERVICE_TIMEOUT`](crate::init::SCAN_SERVICE_TIMEOUT) so the
/// request can try to delete entries from storage after the timeout before the future is dropped.
const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15);

impl ScanService {
Expand Down Expand Up @@ -64,7 +67,7 @@ impl ScanService {

impl Service<Request> for ScanService {
type Response = Response;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

Expand Down Expand Up @@ -97,17 +100,17 @@ impl Service<Request> for ScanService {
.boxed();
}

Request::CheckKeyHashes(_key_hashes) => {
// TODO: check that these entries exist in db
}

Request::RegisterKeys(keys) => {
let mut scan_task = self.scan_task.clone();

return async move {
Ok(Response::RegisteredKeys(
scan_task.register_keys(keys)?.await?,
))
let newly_registered_keys = scan_task.register_keys(keys)?.await?;
if !newly_registered_keys.is_empty() {
Ok(Response::RegisteredKeys(newly_registered_keys))
} else {
Err("no keys were registered, check that keys are not already registered and \
are valid Sapling extended full viewing keys".into())
}
}
.boxed();
}
Expand All @@ -123,7 +126,7 @@ impl Service<Request> for ScanService {
scan_task.remove_keys(keys.clone())?,
)
.await
.map_err(|_| "timeout waiting for delete keys done notification");
.map_err(|_| "request timed out removing keys from scan task".to_string());

// Delete the key from the database after either confirmation that it's been removed from the scan task, or
// waiting `DELETE_KEY_TIMEOUT`.
Expand Down Expand Up @@ -171,7 +174,9 @@ impl Service<Request> for ScanService {
let mut scan_task = self.scan_task.clone();

return async move {
let results_receiver = scan_task.subscribe(keys).await?;
let results_receiver = scan_task.subscribe(keys)?.await.map_err(|_| {
"scan task dropped responder, check that keys are registered"
})?;

Ok(Response::SubscribeResults(results_receiver))
}
Expand All @@ -193,7 +198,5 @@ impl Service<Request> for ScanService {
.boxed();
}
}

async move { Ok(Response::Results(BTreeMap::new())) }.boxed()
}
}
7 changes: 3 additions & 4 deletions zebra-scan/src/service/scan_task/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use tokio::sync::{
oneshot,
};

use tower::BoxError;
use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey};
use zebra_chain::{block::Height, parameters::Network};
use zebra_node_services::scan_service::response::ScanResult;
Expand Down Expand Up @@ -207,14 +206,14 @@ impl ScanTask {
/// Sends a message to the scan task to start sending the results for the provided viewing keys to a channel.
///
/// Returns the channel receiver.
pub async fn subscribe(
pub fn subscribe(
&mut self,
keys: HashSet<SaplingScanningKey>,
) -> Result<Receiver<ScanResult>, BoxError> {
) -> Result<oneshot::Receiver<Receiver<ScanResult>>, TrySendError<ScanTaskCommand>> {
let (rsp_tx, rsp_rx) = oneshot::channel();

self.send(ScanTaskCommand::SubscribeResults { keys, rsp_tx })?;

Ok(rsp_rx.await?)
Ok(rsp_rx)
}
}
7 changes: 6 additions & 1 deletion zebra-scan/src/service/scan_task/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,12 @@ async fn scan_task_processes_messages_correctly() -> Result<(), Report> {
let subscribe_keys: HashSet<String> = sapling_keys[..5].iter().cloned().collect();
let result_receiver_fut = {
let mut mock_scan_task = mock_scan_task.clone();
tokio::spawn(async move { mock_scan_task.subscribe(subscribe_keys.clone()).await })
tokio::spawn(async move {
mock_scan_task
.subscribe(subscribe_keys.clone())
.expect("should send subscribe msg successfully")
.await
})
};

// Wait for spawned task to send subscribe message
Expand Down
83 changes: 82 additions & 1 deletion zebra-scan/src/service/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
//! Tests for ScanService.

use std::time::Duration;

use futures::{stream::FuturesOrdered, StreamExt};
use tokio::sync::mpsc::error::TryRecvError;
use tower::{Service, ServiceBuilder, ServiceExt};
use tower::{timeout::error::Elapsed, Service, ServiceBuilder, ServiceExt};

use color_eyre::{eyre::eyre, Result};

Expand All @@ -10,6 +13,7 @@ use zebra_node_services::scan_service::{request::Request, response::Response};
use zebra_state::TransactionIndex;

use crate::{
init::SCAN_SERVICE_TIMEOUT,
service::{scan_task::ScanTaskCommand, ScanService},
storage::db::tests::{fake_sapling_results, new_test_storage},
tests::{mock_sapling_scanning_keys, ZECPAGES_SAPLING_VIEWING_KEY},
Expand Down Expand Up @@ -329,5 +333,82 @@ async fn scan_service_registers_keys_correctly_for(network: Network) -> Result<(
_ => panic!("scan service should have responded with the `RegisteredKeys` response"),
}

// Try registering invalid keys.
let register_keys_error_message = scan_service
.ready()
.await
.map_err(|err| eyre!(err))?
.call(Request::RegisterKeys(vec![(
"invalid key".to_string(),
None,
)]))
.await
.expect_err("response should be an error when there are no valid keys to be added")
.to_string();

assert!(
register_keys_error_message.starts_with("no keys were registered"),
"error message should say that no keys were registered"
);

Ok(())
}

/// Test that the scan service with a timeout layer returns timeout errors after expected timeout
#[tokio::test]
async fn scan_service_timeout() -> Result<()> {
arya2 marked this conversation as resolved.
Show resolved Hide resolved
let db = new_test_storage(Network::Mainnet);

let (scan_service, _cmd_receiver) = ScanService::new_with_mock_scanner(db);
let mut scan_service = ServiceBuilder::new()
.buffer(10)
.timeout(SCAN_SERVICE_TIMEOUT)
.service(scan_service);

let keys = vec![String::from("fake key")];
let mut response_futs = FuturesOrdered::new();

for request in [
Request::RegisterKeys(keys.iter().cloned().map(|key| (key, None)).collect()),
Request::SubscribeResults(keys.iter().cloned().collect()),
Request::DeleteKeys(keys),
] {
let response_fut = scan_service
.ready()
.await
.expect("service should be ready")
.call(request);

response_futs.push_back(tokio::time::timeout(
SCAN_SERVICE_TIMEOUT
.checked_add(Duration::from_secs(1))
.expect("should not overflow"),
response_fut,
));
}

let expect_timeout_err = |response: Option<Result<Result<_, _>, _>>| {
response
.expect("response_futs should not be empty")
.expect("service should respond with timeout error before outer timeout")
.expect_err("service response should be a timeout error")
};

// RegisterKeys and SubscribeResults should return `Elapsed` errors from `Timeout` layer
for _ in 0..2 {
let response = response_futs.next().await;
expect_timeout_err(response)
.downcast::<Elapsed>()
.expect("service should return Elapsed error from Timeout layer");
}

let response = response_futs.next().await;
let response_error_msg = expect_timeout_err(response).to_string();

assert!(
response_error_msg.starts_with("request timed out"),
"error message should say the request timed out"
);

Ok(())
}
Loading
Loading