Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add(scan): Start scanner gRPC server with zebrad #8241

Merged
merged 15 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6064,6 +6064,7 @@ dependencies = [
"vergen",
"zebra-chain",
"zebra-consensus",
"zebra-grpc",
"zebra-network",
"zebra-node-services",
"zebra-rpc",
Expand Down
5 changes: 5 additions & 0 deletions zebra-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@
#![doc(html_root_url = "https://docs.rs/zebra_grpc")]

pub mod server;

/// The generated scanner proto
pub mod scanner {
tonic::include_proto!("scanner");
}
23 changes: 12 additions & 11 deletions zebra-grpc/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
//! The gRPC server implementation

use std::net::SocketAddr;

use futures_util::future::TryFutureExt;
use tonic::{transport::Server, Response, Status};
use tower::ServiceExt;

use scanner::scanner_server::{Scanner, ScannerServer};
use scanner::{ClearResultsRequest, DeleteKeysRequest, Empty, InfoReply};

use zebra_node_services::scan_service::{
request::Request as ScanServiceRequest, response::Response as ScanServiceResponse,
};

/// The generated scanner proto
pub mod scanner {
tonic::include_proto!("scanner");
}
use crate::scanner::{
scanner_server::{Scanner, ScannerServer},
ClearResultsRequest, DeleteKeysRequest, Empty, InfoReply,
};

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

Expand Down Expand Up @@ -61,7 +60,7 @@ where
));
};

let reply = scanner::InfoReply {
let reply = InfoReply {
min_sapling_birthday_height: min_sapling_birthday_height.0,
};

Expand Down Expand Up @@ -124,7 +123,10 @@ where
}

/// Initializes the zebra-scan gRPC server
pub async fn init<ScanService>(scan_service: ScanService) -> Result<(), color_eyre::Report>
pub async fn init<ScanService>(
listen_addr: SocketAddr,
scan_service: ScanService,
) -> Result<(), color_eyre::Report>
where
ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
+ Clone
Expand All @@ -133,12 +135,11 @@ where
+ 'static,
<ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
{
let addr = "[::1]:50051".parse()?;
let service = ScannerRPC { scan_service };

Server::builder()
.add_service(ScannerServer::new(service))
.serve(addr)
.serve(listen_addr)
.await?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion zebra-scan/src/bin/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let scan_service = ServiceBuilder::new().buffer(10).service(scan_service);

// Start the gRPC server.
zebra_grpc::server::init(scan_service).await?;
zebra_grpc::server::init("127.0.0.1:8231".parse()?, scan_service).await?;
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}
19 changes: 18 additions & 1 deletion zebra-scan/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Configuration for blockchain scanning tasks.

use std::fmt::Debug;
use std::{fmt::Debug, net::SocketAddr};

use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
Expand All @@ -20,6 +20,20 @@ pub struct Config {
// TODO: allow keys without birthdays
pub sapling_keys_to_scan: IndexMap<SaplingScanningKey, u32>,

/// IP address and port for the zebra-scan gRPC server.
///
/// Note: The gRPC server is disabled by default.
/// To enable the gRPC server, set a listen address in the config:
/// ```toml
/// [shielded-scan]
/// listen_addr = '127.0.0.1:8231'
/// ```
///
/// The recommended ports for the gRPC server are:
/// - Mainnet: 127.0.0.1:8231
/// - Testnet: 127.0.0.1:18231
pub listen_addr: Option<SocketAddr>,

/// The scanner results database config.
//
// TODO: Remove fields that are only used by the state, and create a common database config.
Expand All @@ -41,6 +55,9 @@ impl Default for Config {
fn default() -> Self {
Self {
sapling_keys_to_scan: IndexMap::new(),
listen_addr: None,

// TODO: Add a const generic for specifying the default cache_dir path, like 'zebra' or 'zebra-scan'?
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
db_config: DbConfig::default(),
}
}
Expand Down
55 changes: 45 additions & 10 deletions zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,66 @@
//! Initializing the scanner and gRPC server.

use std::net::SocketAddr;

use color_eyre::Report;
use tokio::task::JoinHandle;
use tower::ServiceBuilder;

use zebra_chain::parameters::Network;
use tracing::Instrument;
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
use zebra_state::ChainTipChange;

use crate::{scan, service::ScanService, Config};
use crate::{scan, service::ScanService, storage::Storage, Config};

/// Initialize [`ScanService`] based on its config.
///
/// TODO: add a test for this function.
pub async fn init(
pub async fn init_with_server(
listen_addr: SocketAddr,
config: Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Result<(), Report> {
let scan_service = ServiceBuilder::new().buffer(10).service(ScanService::new(
&config,
network,
state,
chain_tip_change,
));
info!(?config, "starting scan service");
let scan_service = ServiceBuilder::new()
.buffer(10)
.service(ScanService::new(&config, network, state, chain_tip_change).await);

// TODO: move this to zebra-grpc init() function and include addr
info!("starting scan gRPC server");

// Start the gRPC server.
zebra_grpc::server::init(scan_service).await?;
zebra_grpc::server::init(listen_addr, scan_service).await?;

Ok(())
}

/// Initialize the scanner and its gRPC server based on its config, and spawn a task for it.
pub fn spawn_init(
config: Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> JoinHandle<Result<(), Report>> {
if let Some(listen_addr) = config.listen_addr {
// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(
init_with_server(listen_addr, config, network, state, chain_tip_change)
.in_current_span(),
)
} else {
// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(
async move {
let storage =
tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;
let (_cmd_sender, cmd_receiver) = std::sync::mpsc::channel();
scan::start(state, chain_tip_change, storage, cmd_receiver).await
}
.in_current_span(),
)
}
}
2 changes: 1 addition & 1 deletion zebra-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ pub use service::scan_task::scan;
pub mod tests;

pub use config::Config;
pub use init::init;
pub use init::{init_with_server, spawn_init};

pub use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey};
13 changes: 9 additions & 4 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::BTreeMap, future::Future, pin::Pin, task::Poll, time::Dur
use futures::future::FutureExt;
use tower::Service;

use zebra_chain::{parameters::Network, transaction::Hash};
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Hash};

use zebra_state::ChainTipChange;

Expand Down Expand Up @@ -36,15 +36,20 @@ const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15);

impl ScanService {
/// Create a new [`ScanService`].
pub fn new(
pub async fn new(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
let config = config.clone();
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;

Self {
db: Storage::new(config, network, false),
scan_task: ScanTask::spawn(config, network, state, chain_tip_change),
scan_task: ScanTask::spawn(storage.clone(), state, chain_tip_change),
db: storage,
}
}

Expand Down
18 changes: 3 additions & 15 deletions zebra-scan/src/service/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use std::sync::{mpsc, Arc};
use color_eyre::Report;
use tokio::task::JoinHandle;

use zebra_chain::parameters::Network;
use zebra_state::ChainTipChange;

use crate::Config;
use crate::storage::Storage;

mod commands;
mod executor;
Expand All @@ -31,23 +30,12 @@ pub struct ScanTask {

impl ScanTask {
/// Spawns a new [`ScanTask`].
pub fn spawn(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
pub fn spawn(db: Storage, state: scan::State, chain_tip_change: ChainTipChange) -> Self {
// TODO: Use a bounded channel or move this logic to the scan service or another service.
let (cmd_sender, cmd_receiver) = mpsc::channel();

Self {
handle: Arc::new(scan::spawn_init(
config,
network,
state,
chain_tip_change,
cmd_receiver,
)),
handle: Arc::new(scan::spawn_init(db, state, chain_tip_change, cmd_receiver)),
cmd_sender,
}
}
Expand Down
27 changes: 2 additions & 25 deletions zebra-scan/src/service/scan_task/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};
use crate::{
service::{ScanTask, ScanTaskCommand},
storage::{SaplingScanningKey, Storage},
Config,
};

use super::executor;
Expand Down Expand Up @@ -489,32 +488,10 @@ async fn tip_height(mut state: State) -> Result<Height, Report> {
///
/// TODO: add a test for this function.
pub fn spawn_init(
config: &Config,
network: Network,
storage: Storage,
state: State,
chain_tip_change: ChainTipChange,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> JoinHandle<Result<(), Report>> {
let config = config.clone();

// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(init(config, network, state, chain_tip_change, cmd_receiver).in_current_span())
}

/// Initialize the scanner based on its config.
///
/// TODO: add a test for this function.
pub async fn init(
config: Config,
network: Network,
state: State,
chain_tip_change: ChainTipChange,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> Result<(), Report> {
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;

// TODO: add more tasks here?
start(state, chain_tip_change, storage, cmd_receiver).await
tokio::spawn(start(state, chain_tip_change, storage, cmd_receiver).in_current_span())
}
1 change: 1 addition & 0 deletions zebrad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ zebra-scan = { path = "../zebra-scan", features = ["proptest-impl"] }
zebra-node-services = { path = "../zebra-node-services", features = ["rpc-client"] }

zebra-test = { path = "../zebra-test" }
zebra-grpc = { path = "../zebra-grpc" }

# Used by the checkpoint generation tests via the zebra-checkpoints feature
# (the binaries in this crate won't be built unless their features are enabled).
Expand Down
29 changes: 10 additions & 19 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,25 +301,16 @@ impl StartCmd {

#[cfg(feature = "shielded-scan")]
// Spawn never ending scan task only if we have keys to scan for.
let (scan_task_handle, _cmd_sender) =
if !config.shielded_scan.sapling_keys_to_scan.is_empty() {
// TODO: log the number of keys and update the scan_task_starts() test
info!("spawning shielded scanner with configured viewing keys");
let scan_task = zebra_scan::service::scan_task::ScanTask::spawn(
&config.shielded_scan,
config.network.network,
state,
chain_tip_change,
);

(
std::sync::Arc::into_inner(scan_task.handle)
.expect("should only have one reference here"),
Some(scan_task.cmd_sender),
)
} else {
(tokio::spawn(std::future::pending().in_current_span()), None)
};
let scan_task_handle = {
// TODO: log the number of keys and update the scan_task_starts() test
info!("spawning shielded scanner with configured viewing keys");
zebra_scan::spawn_init(
config.shielded_scan.clone(),
config.network.network,
state,
chain_tip_change,
)
};

#[cfg(not(feature = "shielded-scan"))]
// Spawn a dummy scan task which doesn't do anything and never finishes.
Expand Down
Loading
Loading