Skip to content

Commit

Permalink
feat(scanner): Restart scanning where left (#8080)
Browse files Browse the repository at this point in the history
* start scanner where it was left

* fix tests

* add a `scan_start_where_left` test

* refactor a log msg

* fix some comments

* remove function

* fix doc comment

* clippy

* fix `sapling_keys_and_last_scanned_heights()`

* simplify start height

* i went too far, revert some changes back

* change log info to every 10k blocks

* fix build

* Update height snapshot code and check last height is consistent

* Add strictly before and strictly after database key gets

* Move to the previous key using strictly before ops

* Assert that keys are only inserted once

* Update the index in each loop

* Update snapshots

* Remove debugging code

* start scanning at min available height

---------

Co-authored-by: teor <teor@riseup.net>
  • Loading branch information
oxarbitrage and teor2345 authored Dec 13, 2023
1 parent ac72c2b commit 92758a0
Show file tree
Hide file tree
Showing 17 changed files with 266 additions and 111 deletions.
5 changes: 5 additions & 0 deletions zebra-scan/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ impl Config {
pub fn db_config(&self) -> &DbConfig {
&self.db_config
}

/// Returns the database-specific config as mutable.
pub fn db_config_mut(&mut self) -> &mut DbConfig {
&mut self.db_config
}
}
52 changes: 25 additions & 27 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const INITIAL_WAIT: Duration = Duration::from_secs(15);
const CHECK_INTERVAL: Duration = Duration::from_secs(30);

/// We log an info log with progress after this many blocks.
const INFO_LOG_INTERVAL: u32 = 100_000;
const INFO_LOG_INTERVAL: u32 = 10_000;

/// Start a scan task that reads blocks from `state`, scans them with the configured keys in
/// `storage`, and then writes the results to `storage`.
Expand All @@ -64,21 +64,21 @@ pub async fn start(
storage: Storage,
) -> Result<(), Report> {
let network = storage.network();
let mut height = storage.min_sapling_birthday_height();

// Read keys from the storage on disk, which can block async execution.
let key_storage = storage.clone();
let key_birthdays = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
let key_heights = tokio::task::spawn_blocking(move || key_storage.sapling_keys_last_heights())
.wait_for_panics()
.await;
let key_birthdays = Arc::new(key_birthdays);
let key_heights = Arc::new(key_heights);

let mut height = get_min_height(&key_heights).unwrap_or(storage.min_sapling_birthday_height());

// Parse and convert keys once, then use them to scan all blocks.
// There is some cryptography here, but it should be fast even with thousands of keys.
let parsed_keys: HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>),
> = key_birthdays
> = key_heights
.keys()
.map(|key| {
let parsed_keys = sapling_key_to_scan_block_keys(key, network)?;
Expand All @@ -96,7 +96,7 @@ pub async fn start(
state.clone(),
chain_tip_change.clone(),
storage.clone(),
key_birthdays.clone(),
key_heights.clone(),
parsed_keys.clone(),
)
.await?;
Expand Down Expand Up @@ -125,7 +125,7 @@ pub async fn scan_height_and_store_results(
mut state: State,
chain_tip_change: ChainTipChange,
storage: Storage,
key_birthdays: Arc<HashMap<SaplingScanningKey, Height>>,
key_last_scanned_heights: Arc<HashMap<SaplingScanningKey, Height>>,
parsed_keys: Arc<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
>,
Expand All @@ -135,19 +135,7 @@ pub async fn scan_height_and_store_results(
// Only log at info level every 100,000 blocks.
//
// TODO: also log progress every 5 minutes once we reach the tip?
let is_info_log =
height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0;

// TODO: add debug logs?
if is_info_log {
info!(
"Scanning the blockchain: now at block {:?}, current tip {:?}",
height,
chain_tip_change
.latest_chain_tip()
.best_tip_height_and_hash(),
);
}
let is_info_log = height.0 % INFO_LOG_INTERVAL == 0;

// Get a block from the state.
// We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init().
Expand All @@ -168,24 +156,29 @@ pub async fn scan_height_and_store_results(
// Scan it with all the keys.
//
// TODO: scan each key in parallel (after MVP?)
for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() {
for (key_num, (sapling_key, last_scanned_height)) in key_last_scanned_heights.iter().enumerate()
{
// Only scan what was not scanned for each key
if height <= *last_scanned_height {
continue;
}

// # Security
//
// We can't log `sapling_key` here because it is a private viewing key. Anyone who reads
// the logs could use the key to view those transactions.
if is_info_log {
info!(
"Scanning the blockchain for key {}, started at block {:?}",
key_num, birthday_height,
"Scanning the blockchain for key {}, started at block {:?}, now at block {:?}, current tip {:?}",
key_num, last_scanned_height.next().expect("height is not maximum").as_usize(),
height.as_usize(),
chain_tip_change.latest_chain_tip().best_tip_height().expect("we should have a tip to scan").as_usize(),
);
}

// Get the pre-parsed keys for this configured key.
let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default();

// Scan the block, which blocks async execution until the scan is complete.
//
// TODO: skip scanning before birthday height (#8022)
let sapling_key = sapling_key.clone();
let block = block.clone();
let mut storage = storage.clone();
Expand Down Expand Up @@ -403,3 +396,8 @@ fn scanned_block_to_db_result<Nf>(
})
.collect()
}

/// Get the minimal height available in a key_heights map.
fn get_min_height(map: &HashMap<String, Height>) -> Option<Height> {
map.values().cloned().min()
}
21 changes: 16 additions & 5 deletions zebra-scan/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub use db::{SaplingScannedResult, SaplingScanningKey};

use self::db::ScannerWriteBatch;

/// We insert an empty results entry to the database every this interval for each stored key,
/// so we can track progress.
const INSERT_CONTROL_INTERVAL: u32 = 1_000;

/// Store key info and results of the scan.
///
/// `rocksdb` allows concurrent writes through a shared reference,
Expand Down Expand Up @@ -87,16 +91,14 @@ impl Storage {
self.write_batch(batch);
}

/// Returns all the keys and their birthdays.
///
/// Birthdays are adjusted to sapling activation if they are too low or missing.
/// Returns all the keys and their last scanned heights.
///
/// # Performance / Hangs
///
/// This method can block while reading database files, so it must be inside spawn_blocking()
/// in async code.
pub fn sapling_keys(&self) -> HashMap<SaplingScanningKey, Height> {
self.sapling_keys_and_birthday_heights()
pub fn sapling_keys_last_heights(&self) -> HashMap<SaplingScanningKey, Height> {
self.sapling_keys_and_last_scanned_heights()
}

/// Add the sapling results for `height` to the storage. The results can be any map of
Expand All @@ -116,6 +118,10 @@ impl Storage {
// in a single batch.
let mut batch = ScannerWriteBatch::default();

// Every `INSERT_CONTROL_INTERVAL` we add a new entry to the scanner database for each key
// so we can track progress made in the last interval even if no transaction was yet found.
let is_control_time = height.0 % INSERT_CONTROL_INTERVAL == 0 && sapling_results.is_empty();

for (index, sapling_result) in sapling_results {
let index = SaplingScannedDatabaseIndex {
sapling_key: sapling_key.clone(),
Expand All @@ -130,6 +136,11 @@ impl Storage {
batch.insert_sapling_result(self, entry);
}

// Add tracking entry for key.
if is_control_time {
batch.insert_sapling_height(self, sapling_key, height);
}

self.write_batch(batch);
}

Expand Down
12 changes: 11 additions & 1 deletion zebra-scan/src/storage/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,17 @@ impl Storage {

let new_storage = Self { db };

// TODO: report the last scanned height here?
// Report where we are for each key in the database.
let keys = new_storage.sapling_keys_last_heights();
for (key_num, (_key, height)) in keys.iter().enumerate() {
tracing::info!(
"Last scanned height for key number {} is {}, resuming at {}",
key_num,
height.as_usize(),
height.next().expect("height is not maximum").as_usize(),
);
}

tracing::info!("loaded Zebra scanner cache");

new_storage
Expand Down
66 changes: 32 additions & 34 deletions zebra-scan/src/storage/db/sapling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,50 +97,37 @@ impl Storage {
.collect()
}

/// Returns all the keys and their birthday heights.
pub fn sapling_keys_and_birthday_heights(&self) -> HashMap<SaplingScanningKey, Height> {
// This code is a bit complex because we don't have a separate column family for keys
// and their birthday heights.
//
// TODO: make a separate column family after the MVP.

/// Returns all the keys and their last scanned heights.
pub fn sapling_keys_and_last_scanned_heights(&self) -> HashMap<SaplingScanningKey, Height> {
let sapling_tx_ids = self.sapling_tx_ids_cf();
let mut keys = HashMap::new();

// The minimum key is invalid or a dummy key, so we will never have an entry for it.
let mut find_next_key_index = SaplingScannedDatabaseIndex::min();
let mut last_stored_record: Option<(
SaplingScannedDatabaseIndex,
Option<SaplingScannedResult>,
)> = self.db.zs_last_key_value(&sapling_tx_ids);

loop {
// Find the next key, and the first height we have for it.
let Some(entry) = self
.db
.zs_next_key_value_from(&sapling_tx_ids, &find_next_key_index)
else {
break;
let Some((mut last_stored_record_index, _result)) = last_stored_record else {
return keys;
};

let sapling_key = entry.0.sapling_key;
let mut height = entry.0.tx_loc.height;
let _first_result: Option<SaplingScannedResult> = entry.1;

let height_results = self.sapling_results_for_key_and_height(&sapling_key, height);

// If there are no results for this block, then it's a "skip up to height" marker, and
// the birthday height is the next height. If there are some results, it's the actual
// birthday height.
if height_results.values().all(Option::is_none) {
height = height
.next()
.expect("results should only be stored for validated block heights");
}
let sapling_key = last_stored_record_index.sapling_key.clone();
let height = last_stored_record_index.tx_loc.height;

keys.insert(sapling_key.clone(), height);
let prev_height = keys.insert(sapling_key.clone(), height);
assert_eq!(
prev_height, None,
"unexpected duplicate key: keys must only be inserted once\
last_stored_record_index: {last_stored_record_index:?}",
);

// Skip all the results before the next key.
find_next_key_index = SaplingScannedDatabaseIndex::max_for_key(&sapling_key);
// Skip all the results until the next key.
last_stored_record_index = SaplingScannedDatabaseIndex::min_for_key(&sapling_key);
last_stored_record = self
.db
.zs_prev_key_value_strictly_before(&sapling_tx_ids, &last_stored_record_index);
}

keys
}

/// Returns the Sapling indexes and results in the supplied range.
Expand Down Expand Up @@ -216,4 +203,15 @@ impl ScannerWriteBatch {
SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, skip_up_to_height);
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
}

/// Insert sapling height with no results
pub(crate) fn insert_sapling_height(
&mut self,
storage: &Storage,
sapling_key: &SaplingScanningKey,
height: Height,
) {
let index = SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, height);
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
}
}
18 changes: 7 additions & 11 deletions zebra-scan/src/storage/db/tests/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,30 +147,26 @@ fn snapshot_raw_rocksdb_column_family_data(db: &ScannerDb, original_cf_names: &[
/// Snapshot typed scanner result data using high-level storage methods,
/// using `cargo insta` and RON serialization.
fn snapshot_typed_result_data(storage: &Storage) {
// TODO: snapshot the latest scanned heights after PR #8080 merges
//insta::assert_ron_snapshot!("latest_heights", latest_scanned_heights);

// Make sure the typed key format doesn't accidentally change.
//
// TODO: update this after PR #8080
let sapling_keys_and_birthday_heights = storage.sapling_keys();
let sapling_keys_last_heights = storage.sapling_keys_last_heights();

// HashMap has an unstable order across Rust releases, so we need to sort it here.
insta::assert_ron_snapshot!(
"sapling_keys",
sapling_keys_and_birthday_heights,
sapling_keys_last_heights,
{
"." => insta::sorted_redaction()
}
);

// HashMap has an unstable order across Rust releases, so we need to sort it here as well.
for (key_index, (sapling_key, _birthday_height)) in sapling_keys_and_birthday_heights
.iter()
.sorted()
.enumerate()
for (key_index, (sapling_key, last_height)) in
sapling_keys_last_heights.iter().sorted().enumerate()
{
let sapling_results = storage.sapling_results(sapling_key);

assert_eq!(sapling_results.keys().max(), Some(last_height));

// Check internal database method consistency
for (height, results) in sapling_results.iter() {
let sapling_index_and_results =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
source: zebra-scan/src/storage/db/tests/snapshot.rs
expression: sapling_keys_and_birthday_heights
expression: sapling_keys_last_heights
---
{
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
"zxviewsfake": Height(1000000),
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
"zxviewsfake": Height(999999),
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
source: zebra-scan/src/storage/db/tests/snapshot.rs
expression: sapling_keys_and_birthday_heights
expression: sapling_keys_last_heights
---
{
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
"zxviewsfake": Height(1000000),
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
"zxviewsfake": Height(999999),
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
source: zebra-scan/src/storage/db/tests/snapshot.rs
expression: sapling_keys_and_birthday_heights
expression: sapling_keys_last_heights
---
{
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
"zxviewsfake": Height(1000000),
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
"zxviewsfake": Height(999999),
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
source: zebra-scan/src/storage/db/tests/snapshot.rs
expression: sapling_keys_and_birthday_heights
expression: sapling_keys_last_heights
---
{
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419200),
"zxviewsfake": Height(1000000),
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
"zxviewsfake": Height(999999),
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
source: zebra-scan/src/storage/db/tests/snapshot.rs
expression: sapling_keys_and_birthday_heights
expression: sapling_keys_last_heights
---
{
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
"zxviewsfake": Height(1000000),
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(279999),
"zxviewsfake": Height(999999),
}
Loading

0 comments on commit 92758a0

Please sign in to comment.