Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scan blocks with sapling keys and write the results to the database #8040

Merged
merged 14 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5802,6 +5802,7 @@ dependencies = [
"ff",
"group",
"indexmap 2.1.0",
"itertools 0.12.0",
"jubjub",
"rand 0.8.5",
"semver 1.0.20",
Expand Down
3 changes: 2 additions & 1 deletion zebra-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ categories = ["cryptography::cryptocurrencies"]

color-eyre = "0.6.2"
indexmap = { version = "2.0.1", features = ["serde"] }
itertools = "0.12.0"
semver = "1.0.20"
serde = { version = "1.0.193", features = ["serde_derive"] }
tokio = "1.34.0"
tokio = { version = "1.34.0", features = ["time"] }
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
tower = "0.4.13"
tracing = "0.1.39"

Expand Down
4 changes: 3 additions & 1 deletion zebra-scan/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use crate::storage::SaplingScanningKey;
/// Configuration for scanning.
pub struct Config {
/// The sapling keys to scan for and the birthday height of each of them.
///
/// Currently only supports Extended Full Viewing Keys in ZIP-32 format.
//
// TODO: allow keys without birthdays
pub sapling_keys_to_scan: IndexMap<SaplingScanningKey, u32>,

/// The scanner results database config.
//
// TODO: Remove fields that are only used by the state to create a common database config.
// TODO: Remove fields that are only used by the state, and create a common database config.
#[serde(flatten)]
db_config: DbConfig,
}
Expand Down
15 changes: 12 additions & 3 deletions zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::task::JoinHandle;
use tracing::Instrument;

use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
use zebra_state::ChainTipChange;

use crate::{scan, storage::Storage, Config};

Expand All @@ -15,19 +16,27 @@ pub fn spawn_init(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> JoinHandle<Result<(), Report>> {
let config = config.clone();
tokio::spawn(init(config, network, state).in_current_span())

// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(init(config, network, state, chain_tip_change).in_current_span())
}

/// Initialize the scanner based on its config.
///
/// TODO: add a test for this function.
pub async fn init(config: Config, network: Network, state: scan::State) -> Result<(), Report> {
pub async fn init(
config: Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Result<(), Report> {
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network))
.wait_for_panics()
.await;

// TODO: add more tasks here?
scan::start(state, storage).await
scan::start(state, chain_tip_change, storage).await
}
221 changes: 183 additions & 38 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,184 @@
//! The scanner task and scanning APIs.

use std::{sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};

use color_eyre::{eyre::eyre, Report};
use itertools::Itertools;
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use tracing::info;

use zcash_client_backend::{
data_api::ScannedBlock,
encoding::decode_extended_full_viewing_key,
proto::compact_formats::{
ChainMetadata, CompactBlock, CompactSaplingOutput, CompactSaplingSpend, CompactTx,
},
scanning::{ScanError, ScanningKey},
};
use zcash_primitives::zip32::AccountId;
use zcash_primitives::{
constants::*,
sapling::SaplingIvk,
zip32::{AccountId, DiversifiableFullViewingKey, Scope},
};

use zebra_chain::{
block::Block, diagnostic::task::WaitForPanics, parameters::Network,
serialization::ZcashSerialize, transaction::Transaction,
block::Block,
chain_tip::ChainTip,
diagnostic::task::WaitForPanics,
parameters::Network,
serialization::ZcashSerialize,
transaction::{self, Transaction},
};
use zebra_state::{ChainTipChange, SaplingScannedResult};

use crate::storage::Storage;
use crate::storage::{SaplingScanningKey, Storage};

/// The generic state type used by the scanner.
pub type State = Buffer<
BoxService<zebra_state::Request, zebra_state::Response, zebra_state::BoxError>,
zebra_state::Request,
>;

/// Wait a few seconds at startup so tip height is always `Some`.
const INITIAL_WAIT: Duration = Duration::from_secs(10);
/// Wait a few seconds at startup for some blocks to get verified.
///
/// But sometimes the state might be empty if the network is slow.
const INITIAL_WAIT: Duration = Duration::from_secs(15);

/// The amount of time between checking and starting new scans.
/// The amount of time between checking for new blocks and starting new scans.
///
/// This is just under half the target block interval.
const CHECK_INTERVAL: Duration = Duration::from_secs(30);

/// Start the scan task given state and storage.
///
/// - This function is dummy at the moment. It just makes sure we can read the storage and the state.
/// - Modifications here might have an impact in the `scan_task_starts` test.
/// - Real scanning code functionality will be added in the future here.
pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> {
// We want to make sure the state has a tip height available before we start scanning.
/// We log an info log with progress after this many blocks.
const INFO_LOG_INTERVAL: u32 = 100_000;

/// Start a scan task that reads blocks from `state`, scans them with the configured keys in
/// `storage`, and then writes the results to `storage`.
pub async fn start(
mut state: State,
chain_tip_change: ChainTipChange,
storage: Storage,
) -> Result<(), Report> {
let network = storage.network();
let mut height = storage.min_sapling_birthday_height();

// Read keys from the storage on disk, which can block async execution.
let key_storage = storage.clone();
let key_birthdays = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
.wait_for_panics()
.await;

// Parse and convert keys once, then use them to scan all blocks.
// There is some cryptography here, but it should be fast even with thousands of keys.
let parsed_keys: HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>),
> = key_birthdays
.keys()
.map(|key| {
let parsed_keys = sapling_key_to_scan_block_keys(key, network)?;
Ok::<_, Report>((key.clone(), parsed_keys))
})
.try_collect()?;

// Give empty states time to verify some blocks before we start scanning.
tokio::time::sleep(INITIAL_WAIT).await;

loop {
// Make sure we can query the state
let request = state
// Get a block from the state.
// We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init().
let block = state
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::Tip)
.call(zebra_state::Request::Block(height.into()))
.await
.map_err(|e| eyre!(e));
.map_err(|e| eyre!(e))?;

let tip = match request? {
zebra_state::Response::Tip(tip) => tip,
let block = match block {
zebra_state::Response::Block(Some(block)) => block,
zebra_state::Response::Block(None) => {
// If we've reached the tip, sleep for a while then try and get the same block.
tokio::time::sleep(CHECK_INTERVAL).await;
continue;
}
_ => unreachable!("unmatched response to a state::Tip request"),
};

// Read keys from the storage on disk, which can block.
let key_storage = storage.clone();
let available_keys = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
.wait_for_panics()
.await;
// Only log at info level every 100,000 blocks
let is_info_log =
height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0;

for key in available_keys {
// TODO: add debug logs?
if is_info_log {
info!(
"Scanning the blockchain for key {} from block {:?} to {:?}",
key.0, key.1, tip,
"Scanning the blockchain: now at block {:?}, current tip {:?}",
height,
chain_tip_change
.latest_chain_tip()
.best_tip_height_and_hash(),
);
}

tokio::time::sleep(CHECK_INTERVAL).await;
for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() {
// # Security
//
// We can't log `sapling_key` here because it is a private viewing key. Anyone who reads
// the logs could use the key to view those transactions.
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
if is_info_log {
info!(
"Scanning the blockchain for key {}, started at block {:?}",
key_num, birthday_height,
);
}

// Get the pre-parsed keys for this configured key.
let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default();

// Scan the block, which blocks async execution until the scan is complete.
//
// TODO: skip scanning before birthday height (#8022)
// TODO: scan each key in parallel (after MVP?)
let sapling_key = sapling_key.clone();
let block = block.clone();
let mut storage = storage.clone();

// We use a dummy size of the Sapling note commitment tree.
//
// We can't set the size to zero, because the underlying scanning function would return
// `zcash_client_backeng::scanning::ScanError::TreeSizeUnknown`.
//
// And we can't set them close to 0, because the scanner subtracts the number of notes
// in the block, and panics with "attempt to subtract with overflow". The number of
// notes in a block must be less than this value, this is a consensus rule.
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
let sapling_tree_size = 1 << 16;

tokio::task::spawn_blocking(move || {
let dfvk_res =
scan_block(network, &block, sapling_tree_size, &dfvks).map_err(|e| eyre!(e))?;
let ivk_res =
scan_block(network, &block, sapling_tree_size, &ivks).map_err(|e| eyre!(e))?;
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

let dfvk_res = scanned_block_to_db_result(dfvk_res);
let ivk_res = scanned_block_to_db_result(ivk_res);

storage.add_sapling_result(sapling_key.clone(), height, dfvk_res);
storage.add_sapling_result(sapling_key, height, ivk_res);

Ok::<_, Report>(())
})
.wait_for_panics()
.await?;
}

height = height
.next()
.expect("a valid blockchain never reaches the max height");
}
}

/// Returns transactions belonging to the given `ScanningKey`.
/// Returns transactions belonging to the given `ScanningKey`. This list of keys should come from
/// a single configured `SaplingScanningKey`.
///
/// # Performance / Hangs
///
Expand All @@ -88,9 +191,9 @@ pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> {
/// - Add prior block metadata once we have access to Zebra's state.
pub fn scan_block<K: ScanningKey>(
network: Network,
block: Arc<Block>,
block: &Arc<Block>,
sapling_tree_size: u32,
scanning_key: &K,
scanning_keys: &[K],
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<ScannedBlock<K::Nf>, ScanError> {
// TODO: Implement a check that returns early when the block height is below the Sapling
// activation height.
Expand All @@ -105,29 +208,61 @@ pub fn scan_block<K: ScanningKey>(

// Use a dummy `AccountId` as we don't use accounts yet.
let dummy_account = AccountId::from(0);

// We only support scanning one key and one block per function call for now.
let scanning_keys = vec![(&dummy_account, scanning_key)];
let scanning_keys: Vec<_> = scanning_keys
.iter()
.map(|key| (&dummy_account, key))
.collect();

zcash_client_backend::scanning::scan_block(
&network,
block_to_compact(block, chain_metadata),
&scanning_keys,
scanning_keys.as_slice(),
// Ignore whether notes are change from a viewer's own spends for now.
&[],
// Ignore previous blocks for now.
None,
)
}

/// Converts a Zebra-format scanning key into some `scan_block()` keys.
///
/// Currently only accepts extended full viewing keys, and returns both their diversifiable full
/// viewing key and their individual viewing key, for testing purposes.
///
/// TODO: work out what string format is used for SaplingIvk, if any, and support it here
/// performance: stop returning both the dfvk and ivk for the same key
pub fn sapling_key_to_scan_block_keys(
sapling_key: &SaplingScanningKey,
network: Network,
) -> Result<(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>), Report> {
let hrp = if network.is_a_test_network() {
// Assume custom testnets have the same HRP
//
// TODO: add the regtest HRP here
testnet::HRP_SAPLING_EXTENDED_FULL_VIEWING_KEY
} else {
mainnet::HRP_SAPLING_EXTENDED_FULL_VIEWING_KEY
};

let efvk = decode_extended_full_viewing_key(hrp, sapling_key).map_err(|e| eyre!(e))?;

// Just return all the keys for now, so we can be sure our code supports them.
let dfvk = efvk.to_diversifiable_full_viewing_key();
let eivk = dfvk.to_ivk(Scope::External);
let iivk = dfvk.to_ivk(Scope::Internal);

Ok((vec![dfvk], vec![eivk, iivk]))
}

/// Converts a zebra block and meta data into a compact block.
pub fn block_to_compact(block: Arc<Block>, chain_metadata: ChainMetadata) -> CompactBlock {
pub fn block_to_compact(block: &Arc<Block>, chain_metadata: ChainMetadata) -> CompactBlock {
CompactBlock {
height: block
.coinbase_height()
.expect("verified block should have a valid height")
.0
.into(),
// TODO: performance: look up the block hash from the state rather than recalculating it
hash: block.hash().bytes_in_display_order().to_vec(),
prev_hash: block
.header
Expand Down Expand Up @@ -164,6 +299,7 @@ fn transaction_to_compact((index, tx): (usize, Arc<Transaction>)) -> CompactTx {
index: index
.try_into()
.expect("tx index in block should fit in u64"),
// TODO: performance: look up the tx hash from the state rather than recalculating it
hash: tx.hash().bytes_in_display_order().to_vec(),

// `fee` is not checked by the `scan_block` function. It is allowed to be unset.
Expand Down Expand Up @@ -202,3 +338,12 @@ fn transaction_to_compact((index, tx): (usize, Arc<Transaction>)) -> CompactTx {
actions: vec![],
}
}

/// Convert a scanned block to a list of scanner database results.
fn scanned_block_to_db_result<Nf>(scanned_block: ScannedBlock<Nf>) -> Vec<SaplingScannedResult> {
scanned_block
.transactions()
.iter()
.map(|tx| transaction::Hash::from_bytes_in_display_order(tx.txid.as_ref()))
.collect()
}
Loading
Loading