Skip to content

Commit

Permalink
add(scan): Start scanner gRPC server with zebrad (#8241)
Browse files Browse the repository at this point in the history
* adds clear_results RPC method for zebra-scan

* adds delete_keys rpc method

* adds docs

* Update zebra-grpc/proto/scanner.proto

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>

* Apply suggestions from code review

* start zebra-scan gRPC server from zebrad start command

* adds a test that the scanner starts with zebrad

* adds a `listen_addr` field to the shielded scan config

* updates test to use a random port and set the listen_addr config field

* fixes test

* Update zebra-scan/src/config.rs

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>

* fixes panic when trying to open multiple mutable storage instances.

* open db in blocking task

* fixes test

---------

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
  • Loading branch information
arya2 and oxarbitrage authored Feb 7, 2024
1 parent 1cfed24 commit 2c0bc3a
Show file tree
Hide file tree
Showing 15 changed files with 161 additions and 94 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6065,6 +6065,7 @@ dependencies = [
"vergen",
"zebra-chain",
"zebra-consensus",
"zebra-grpc",
"zebra-network",
"zebra-node-services",
"zebra-rpc",
Expand Down
5 changes: 5 additions & 0 deletions zebra-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@
#![doc(html_root_url = "https://docs.rs/zebra_grpc")]

pub mod server;

/// The generated scanner proto
pub mod scanner {
tonic::include_proto!("scanner");
}
23 changes: 12 additions & 11 deletions zebra-grpc/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
//! The gRPC server implementation
use std::net::SocketAddr;

use futures_util::future::TryFutureExt;
use tonic::{transport::Server, Response, Status};
use tower::ServiceExt;

use scanner::scanner_server::{Scanner, ScannerServer};
use scanner::{ClearResultsRequest, DeleteKeysRequest, Empty, InfoReply};

use zebra_node_services::scan_service::{
request::Request as ScanServiceRequest, response::Response as ScanServiceResponse,
};

/// The generated scanner proto
pub mod scanner {
tonic::include_proto!("scanner");
}
use crate::scanner::{
scanner_server::{Scanner, ScannerServer},
ClearResultsRequest, DeleteKeysRequest, Empty, InfoReply,
};

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

Expand Down Expand Up @@ -61,7 +60,7 @@ where
));
};

let reply = scanner::InfoReply {
let reply = InfoReply {
min_sapling_birthday_height: min_sapling_birthday_height.0,
};

Expand Down Expand Up @@ -124,7 +123,10 @@ where
}

/// Initializes the zebra-scan gRPC server
pub async fn init<ScanService>(scan_service: ScanService) -> Result<(), color_eyre::Report>
pub async fn init<ScanService>(
listen_addr: SocketAddr,
scan_service: ScanService,
) -> Result<(), color_eyre::Report>
where
ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
+ Clone
Expand All @@ -133,12 +135,11 @@ where
+ 'static,
<ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
{
let addr = "[::1]:50051".parse()?;
let service = ScannerRPC { scan_service };

Server::builder()
.add_service(ScannerServer::new(service))
.serve(addr)
.serve(listen_addr)
.await?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion zebra-scan/src/bin/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let scan_service = ServiceBuilder::new().buffer(10).service(scan_service);

// Start the gRPC server.
zebra_grpc::server::init(scan_service).await?;
zebra_grpc::server::init("127.0.0.1:8231".parse()?, scan_service).await?;

Ok(())
}
19 changes: 18 additions & 1 deletion zebra-scan/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Configuration for blockchain scanning tasks.
use std::fmt::Debug;
use std::{fmt::Debug, net::SocketAddr};

use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
Expand All @@ -20,6 +20,20 @@ pub struct Config {
// TODO: allow keys without birthdays
pub sapling_keys_to_scan: IndexMap<SaplingScanningKey, u32>,

/// IP address and port for the zebra-scan gRPC server.
///
/// Note: The gRPC server is disabled by default.
/// To enable the gRPC server, set a listen address in the config:
/// ```toml
/// [shielded-scan]
/// listen_addr = '127.0.0.1:8231'
/// ```
///
/// The recommended ports for the gRPC server are:
/// - Mainnet: 127.0.0.1:8231
/// - Testnet: 127.0.0.1:18231
pub listen_addr: Option<SocketAddr>,

/// The scanner results database config.
//
// TODO: Remove fields that are only used by the state, and create a common database config.
Expand All @@ -41,6 +55,9 @@ impl Default for Config {
fn default() -> Self {
Self {
sapling_keys_to_scan: IndexMap::new(),
listen_addr: None,

// TODO: Add a const generic for specifying the default cache_dir path, like 'zebra' or 'zebra-scan'?
db_config: DbConfig::default(),
}
}
Expand Down
55 changes: 45 additions & 10 deletions zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,66 @@
//! Initializing the scanner and gRPC server.
use std::net::SocketAddr;

use color_eyre::Report;
use tokio::task::JoinHandle;
use tower::ServiceBuilder;

use zebra_chain::parameters::Network;
use tracing::Instrument;
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
use zebra_state::ChainTipChange;

use crate::{scan, service::ScanService, Config};
use crate::{scan, service::ScanService, storage::Storage, Config};

/// Initialize [`ScanService`] based on its config.
///
/// TODO: add a test for this function.
pub async fn init(
pub async fn init_with_server(
listen_addr: SocketAddr,
config: Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Result<(), Report> {
let scan_service = ServiceBuilder::new().buffer(10).service(ScanService::new(
&config,
network,
state,
chain_tip_change,
));
info!(?config, "starting scan service");
let scan_service = ServiceBuilder::new()
.buffer(10)
.service(ScanService::new(&config, network, state, chain_tip_change).await);

// TODO: move this to zebra-grpc init() function and include addr
info!("starting scan gRPC server");

// Start the gRPC server.
zebra_grpc::server::init(scan_service).await?;
zebra_grpc::server::init(listen_addr, scan_service).await?;

Ok(())
}

/// Initialize the scanner and its gRPC server based on its config, and spawn a task for it.
pub fn spawn_init(
config: Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> JoinHandle<Result<(), Report>> {
if let Some(listen_addr) = config.listen_addr {
// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(
init_with_server(listen_addr, config, network, state, chain_tip_change)
.in_current_span(),
)
} else {
// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(
async move {
let storage =
tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;
let (_cmd_sender, cmd_receiver) = std::sync::mpsc::channel();
scan::start(state, chain_tip_change, storage, cmd_receiver).await
}
.in_current_span(),
)
}
}
2 changes: 1 addition & 1 deletion zebra-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ pub use service::scan_task::scan;
pub mod tests;

pub use config::Config;
pub use init::init;
pub use init::{init_with_server, spawn_init};

pub use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey};
13 changes: 9 additions & 4 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::BTreeMap, future::Future, pin::Pin, task::Poll, time::Dur
use futures::future::FutureExt;
use tower::Service;

use zebra_chain::{parameters::Network, transaction::Hash};
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Hash};

use zebra_state::ChainTipChange;

Expand Down Expand Up @@ -36,15 +36,20 @@ const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15);

impl ScanService {
/// Create a new [`ScanService`].
pub fn new(
pub async fn new(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
let config = config.clone();
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;

Self {
db: Storage::new(config, network, false),
scan_task: ScanTask::spawn(config, network, state, chain_tip_change),
scan_task: ScanTask::spawn(storage.clone(), state, chain_tip_change),
db: storage,
}
}

Expand Down
18 changes: 3 additions & 15 deletions zebra-scan/src/service/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use std::sync::{mpsc, Arc};
use color_eyre::Report;
use tokio::task::JoinHandle;

use zebra_chain::parameters::Network;
use zebra_state::ChainTipChange;

use crate::Config;
use crate::storage::Storage;

mod commands;
mod executor;
Expand All @@ -31,23 +30,12 @@ pub struct ScanTask {

impl ScanTask {
/// Spawns a new [`ScanTask`].
pub fn spawn(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
pub fn spawn(db: Storage, state: scan::State, chain_tip_change: ChainTipChange) -> Self {
// TODO: Use a bounded channel or move this logic to the scan service or another service.
let (cmd_sender, cmd_receiver) = mpsc::channel();

Self {
handle: Arc::new(scan::spawn_init(
config,
network,
state,
chain_tip_change,
cmd_receiver,
)),
handle: Arc::new(scan::spawn_init(db, state, chain_tip_change, cmd_receiver)),
cmd_sender,
}
}
Expand Down
27 changes: 2 additions & 25 deletions zebra-scan/src/service/scan_task/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};
use crate::{
service::{ScanTask, ScanTaskCommand},
storage::{SaplingScanningKey, Storage},
Config,
};

use super::executor;
Expand Down Expand Up @@ -489,32 +488,10 @@ async fn tip_height(mut state: State) -> Result<Height, Report> {
///
/// TODO: add a test for this function.
pub fn spawn_init(
config: &Config,
network: Network,
storage: Storage,
state: State,
chain_tip_change: ChainTipChange,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> JoinHandle<Result<(), Report>> {
let config = config.clone();

// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(init(config, network, state, chain_tip_change, cmd_receiver).in_current_span())
}

/// Initialize the scanner based on its config.
///
/// TODO: add a test for this function.
pub async fn init(
config: Config,
network: Network,
state: State,
chain_tip_change: ChainTipChange,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> Result<(), Report> {
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;

// TODO: add more tasks here?
start(state, chain_tip_change, storage, cmd_receiver).await
tokio::spawn(start(state, chain_tip_change, storage, cmd_receiver).in_current_span())
}
1 change: 1 addition & 0 deletions zebrad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ zebra-scan = { path = "../zebra-scan", features = ["proptest-impl"] }
zebra-node-services = { path = "../zebra-node-services", features = ["rpc-client"] }

zebra-test = { path = "../zebra-test" }
zebra-grpc = { path = "../zebra-grpc" }

# Used by the checkpoint generation tests via the zebra-checkpoints feature
# (the binaries in this crate won't be built unless their features are enabled).
Expand Down
29 changes: 10 additions & 19 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,25 +301,16 @@ impl StartCmd {

#[cfg(feature = "shielded-scan")]
// Spawn never ending scan task only if we have keys to scan for.
let (scan_task_handle, _cmd_sender) =
if !config.shielded_scan.sapling_keys_to_scan.is_empty() {
// TODO: log the number of keys and update the scan_task_starts() test
info!("spawning shielded scanner with configured viewing keys");
let scan_task = zebra_scan::service::scan_task::ScanTask::spawn(
&config.shielded_scan,
config.network.network,
state,
chain_tip_change,
);

(
std::sync::Arc::into_inner(scan_task.handle)
.expect("should only have one reference here"),
Some(scan_task.cmd_sender),
)
} else {
(tokio::spawn(std::future::pending().in_current_span()), None)
};
let scan_task_handle = {
// TODO: log the number of keys and update the scan_task_starts() test
info!("spawning shielded scanner with configured viewing keys");
zebra_scan::spawn_init(
config.shielded_scan.clone(),
config.network.network,
state,
chain_tip_change,
)
};

#[cfg(not(feature = "shielded-scan"))]
// Spawn a dummy scan task which doesn't do anything and never finishes.
Expand Down
Loading

0 comments on commit 2c0bc3a

Please sign in to comment.