Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change(scan): Store one transaction ID per database row, to make queries easier #8062

Merged
merged 8 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
//! The scanner task and scanning APIs.

use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Duration,
};

use color_eyre::{eyre::eyre, Report};
use itertools::Itertools;
Expand Down Expand Up @@ -29,7 +33,7 @@ use zebra_chain::{
serialization::ZcashSerialize,
transaction::Transaction,
};
use zebra_state::{ChainTipChange, SaplingScannedResult};
use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};

use crate::storage::{SaplingScanningKey, Storage};

Expand Down Expand Up @@ -207,8 +211,8 @@ pub async fn scan_height_and_store_results(
let dfvk_res = scanned_block_to_db_result(dfvk_res);
let ivk_res = scanned_block_to_db_result(ivk_res);

storage.add_sapling_result(sapling_key.clone(), height, dfvk_res);
storage.add_sapling_result(sapling_key, height, ivk_res);
storage.add_sapling_results(sapling_key.clone(), height, dfvk_res);
storage.add_sapling_results(sapling_key, height, ivk_res);

Ok::<_, Report>(())
})
Expand Down Expand Up @@ -385,10 +389,17 @@ fn transaction_to_compact((index, tx): (usize, Arc<Transaction>)) -> CompactTx {
}

/// Convert a scanned block to a list of scanner database results.
fn scanned_block_to_db_result<Nf>(scanned_block: ScannedBlock<Nf>) -> Vec<SaplingScannedResult> {
fn scanned_block_to_db_result<Nf>(
scanned_block: ScannedBlock<Nf>,
) -> BTreeMap<TransactionIndex, SaplingScannedResult> {
scanned_block
.transactions()
.iter()
.map(|tx| SaplingScannedResult::from(tx.txid.as_ref()))
.map(|tx| {
(
TransactionIndex::from_usize(tx.index),
SaplingScannedResult::from(tx.txid.as_ref()),
)
})
.collect()
}
42 changes: 23 additions & 19 deletions zebra-scan/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use zebra_chain::{
block::Height,
parameters::{Network, NetworkUpgrade},
};
use zebra_state::{SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex};
use zebra_state::{
SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, TransactionIndex, TransactionLocation,
};

use crate::config::Config;

Expand Down Expand Up @@ -56,8 +58,8 @@ impl Storage {
pub fn new(config: &Config, network: Network) -> Self {
let mut storage = Self::new_db(config, network);

for (key, birthday) in config.sapling_keys_to_scan.iter() {
storage.add_sapling_key(key.clone(), Some(zebra_chain::block::Height(*birthday)));
for (sapling_key, birthday) in config.sapling_keys_to_scan.iter() {
storage.add_sapling_key(sapling_key, Some(zebra_chain::block::Height(*birthday)));
}

storage
Expand All @@ -69,12 +71,12 @@ impl Storage {
///
/// This method can block while writing database files, so it must be inside spawn_blocking()
/// in async code.
pub fn add_sapling_key(&mut self, key: SaplingScanningKey, birthday: Option<Height>) {
pub fn add_sapling_key(&mut self, sapling_key: &SaplingScanningKey, birthday: Option<Height>) {
// It's ok to write some keys and not others during shutdown, so each key can get its own
// batch. (They will be re-written on startup anyway.)
let mut batch = ScannerWriteBatch::default();

batch.insert_sapling_key(self, key, birthday);
batch.insert_sapling_key(self, sapling_key, birthday);

self.write_batch(batch);
}
Expand All @@ -91,33 +93,35 @@ impl Storage {
self.sapling_keys_and_birthday_heights()
}

/// Add a sapling result to the storage.
/// Add the sapling results for `height` to the storage.
///
/// # Performance / Hangs
///
/// This method can block while writing database files, so it must be inside spawn_blocking()
/// in async code.
pub fn add_sapling_result(
pub fn add_sapling_results(
&mut self,
sapling_key: SaplingScanningKey,
height: Height,
sapling_result: Vec<SaplingScannedResult>,
sapling_results: BTreeMap<TransactionIndex, SaplingScannedResult>,
) {
// It's ok to write some results and not others during shutdown, so each result can get its
// own batch. (They will be re-scanned on startup anyway.)
// We skip heights that have one or more results, so the results for each height must be
// in a single batch.
let mut batch = ScannerWriteBatch::default();

let index = SaplingScannedDatabaseIndex {
sapling_key,
height,
};
for (index, sapling_result) in sapling_results {
let index = SaplingScannedDatabaseIndex {
sapling_key: sapling_key.clone(),
tx_loc: TransactionLocation::from_parts(height, index),
};

let entry = SaplingScannedDatabaseEntry {
index,
value: sapling_result,
};
let entry = SaplingScannedDatabaseEntry {
index,
value: Some(sapling_result),
};

batch.insert_sapling_result(self, entry);
batch.insert_sapling_result(self, entry);
}

self.write_batch(batch);
}
Expand Down
8 changes: 6 additions & 2 deletions zebra-scan/src/storage/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub const SCANNER_COLUMN_FAMILIES_IN_CODE: &[&str] = &[
// TODO: add Orchard support
];

/// The major version number of the scanner database. This must be updated whenever the database
/// format changes.
const SCANNER_DATABASE_FORMAT_MAJOR_VERSION: u64 = 1;

impl Storage {
// Creation

Expand Down Expand Up @@ -96,8 +100,8 @@ impl Storage {

/// The database format version in the running scanner code.
pub fn database_format_version_in_code() -> Version {
// TODO: implement scanner database versioning
Version::new(0, 0, 0)
// TODO: implement in-place scanner database format upgrades
Version::new(SCANNER_DATABASE_FORMAT_MAJOR_VERSION, 0, 0)
}

/// Check for panics in code running in spawned threads.
Expand Down
115 changes: 79 additions & 36 deletions zebra-scan/src/storage/db/sapling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,34 @@
//!
//! | name | key | value |
//! |------------------|-------------------------------|--------------------------|
//! | `sapling_tx_ids` | `SaplingScannedDatabaseIndex` | `Vec<transaction::Hash>` |
//! | `sapling_tx_ids` | `SaplingScannedDatabaseIndex` | `Option<SaplingScannedResult>` |
//!
//! And types:
//! SaplingScannedDatabaseIndex = `SaplingScanningKey` | `Height`
//! `SaplingScannedResult`: same as `transaction::Hash`, but with bytes in display order.
//! `None` is stored as a zero-length array of bytes.
//!
//! `SaplingScannedDatabaseIndex` = `SaplingScanningKey` | `TransactionLocation`
//! `TransactionLocation` = `Height` | `TransactionIndex`
//!
//! This format allows us to efficiently find all the results for each key, and the latest height
//! for each key.
//!
//! If there are no results for a height, we store an empty list of results. This allows is to scan
//! each key from the next height after we restart. We also use this mechanism to store key
//! birthday heights, by storing the height before the birthday as the "last scanned" block.
//! If there are no results for a height, we store `None` as the result for the coinbase
//! transaction. This allows is to scan each key from the next height after we restart. We also use
//! this mechanism to store key birthday heights, by storing the height before the birthday as the
//! "last scanned" block.

use std::{
collections::{BTreeMap, HashMap},
ops::RangeBounds,
};

use std::collections::{BTreeMap, HashMap};
use itertools::Itertools;

use zebra_chain::block::Height;
use zebra_state::{
AsColumnFamilyRef, ReadDisk, SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex,
SaplingScannedResult, SaplingScanningKey, WriteDisk,
SaplingScannedResult, SaplingScanningKey, TransactionIndex, WriteDisk,
};

use crate::storage::Storage;
Expand All @@ -36,16 +46,29 @@ pub const SAPLING_TX_IDS: &str = "sapling_tx_ids";
impl Storage {
// Reading Sapling database entries

/// Returns the results for a specific key and block height.
/// Returns the result for a specific database index (key, block height, transaction index).
//
// TODO: add tests for this method
pub fn sapling_result_for_key_and_block(
pub fn sapling_result_for_index(
&self,
index: &SaplingScannedDatabaseIndex,
) -> Vec<SaplingScannedResult> {
self.db
.zs_get(&self.sapling_tx_ids_cf(), &index)
.unwrap_or_default()
) -> Option<SaplingScannedResult> {
self.db.zs_get(&self.sapling_tx_ids_cf(), &index)
}

/// Returns the results for a specific key and block height.
pub fn sapling_results_for_key_and_height(
&self,
sapling_key: &SaplingScanningKey,
height: Height,
) -> BTreeMap<TransactionIndex, Option<SaplingScannedResult>> {
let kh_min = SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, height);
let kh_max = SaplingScannedDatabaseIndex::max_for_key_and_height(sapling_key, height);

self.sapling_results_in_range(kh_min..=kh_max)
.into_iter()
.map(|(result_index, txid)| (result_index.tx_loc.index, txid))
.collect()
}

/// Returns all the results for a specific key, indexed by height.
Expand All @@ -56,10 +79,19 @@ impl Storage {
let k_min = SaplingScannedDatabaseIndex::min_for_key(sapling_key);
let k_max = SaplingScannedDatabaseIndex::max_for_key(sapling_key);

self.db
.zs_items_in_range_ordered(&self.sapling_tx_ids_cf(), k_min..=k_max)
// Get an iterator of individual transaction results, and turn it into a HashMap by height
let results: HashMap<Height, Vec<Option<SaplingScannedResult>>> = self
.sapling_results_in_range(k_min..=k_max)
.into_iter()
.map(|(index, result)| (index.tx_loc.height, result))
.into_group_map();

// But we want Vec<SaplingScannedResult>, with empty Vecs instead of [None, None, ...]
results
.into_iter()
.map(|(index, result)| (index.height, result))
.map(|(index, vector)| -> (Height, Vec<SaplingScannedResult>) {
(index, vector.into_iter().flatten().collect())
})
.collect()
}

Expand All @@ -85,16 +117,16 @@ impl Storage {
break;
};

let (index, results): (_, Vec<SaplingScannedResult>) = entry;
let SaplingScannedDatabaseIndex {
sapling_key,
mut height,
} = index;
let sapling_key = entry.0.sapling_key;
let mut height = entry.0.tx_loc.height;
let _first_result: Option<SaplingScannedResult> = entry.1;

// If there are no results, then it's a "skip up to height" marker, and the birthday
// height is the next height. If there are some results, it's the actual birthday
// height.
if results.is_empty() {
let height_results = self.sapling_results_for_key_and_height(&sapling_key, height);

// If there are no results for this block, then it's a "skip up to height" marker, and
// the birthday height is the next height. If there are some results, it's the actual
// birthday height.
if height_results.values().all(Option::is_none) {
height = height
.next()
.expect("results should only be stored for validated block heights");
Expand All @@ -109,6 +141,17 @@ impl Storage {
keys
}

/// Returns the Sapling indexes and results in the supplied range.
///
/// Convenience method for accessing raw data with the correct types.
fn sapling_results_in_range(
&self,
range: impl RangeBounds<SaplingScannedDatabaseIndex>,
) -> BTreeMap<SaplingScannedDatabaseIndex, Option<SaplingScannedResult>> {
self.db
.zs_items_in_range_ordered(&self.sapling_tx_ids_cf(), range)
}

// Column family convenience methods

/// Returns a handle to the `sapling_tx_ids` column family.
Expand All @@ -122,7 +165,7 @@ impl Storage {

/// Write `batch` to the database for this storage.
pub(crate) fn write_batch(&self, batch: ScannerWriteBatch) {
// Just panic on errors for now
// Just panic on errors for now.
self.db
.write_batch(batch.0)
.expect("unexpected database error")
Expand All @@ -147,12 +190,15 @@ impl ScannerWriteBatch {
/// Insert a sapling scanning `key`, and mark all heights before `birthday_height` so they
/// won't be scanned.
///
/// If a result already exists for the height before the birthday, it is replaced with an empty
/// result.
/// If a result already exists for the coinbase transaction at the height before the birthday,
/// it is replaced with an empty result. This can happen if the user increases the birthday
/// height.
///
/// TODO: ignore incorrect changes to birthday heights
pub(crate) fn insert_sapling_key(
&mut self,
storage: &Storage,
sapling_key: SaplingScanningKey,
sapling_key: &SaplingScanningKey,
birthday_height: Option<Height>,
) {
let min_birthday_height = storage.min_sapling_birthday_height();
Expand All @@ -162,13 +208,10 @@ impl ScannerWriteBatch {
.unwrap_or(min_birthday_height)
.max(min_birthday_height);
// And we want to skip up to the height before it.
let skip_up_to_height = birthday_height.previous().unwrap_or(Height(0));

let index = SaplingScannedDatabaseIndex {
sapling_key,
height: skip_up_to_height,
};
let skip_up_to_height = birthday_height.previous().unwrap_or(Height::MIN);

self.zs_insert(&storage.sapling_tx_ids_cf(), index, Vec::new());
let index =
SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, skip_up_to_height);
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
}
}
10 changes: 7 additions & 3 deletions zebra-scan/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use zebra_chain::{
transparent::{CoinbaseData, Input},
work::{difficulty::CompactDifficulty, equihash::Solution},
};
use zebra_state::SaplingScannedResult;
use zebra_state::{SaplingScannedResult, TransactionIndex};

use crate::{
config::Config,
Expand Down Expand Up @@ -189,7 +189,7 @@ fn scanning_fake_blocks_store_key_and_results() -> Result<()> {
let mut s = crate::storage::Storage::new(&Config::ephemeral(), Network::Mainnet);

// Insert the generated key to the database
s.add_sapling_key(key_to_be_stored.clone(), None);
s.add_sapling_key(&key_to_be_stored, None);

// Check key was added
assert_eq!(s.sapling_keys().len(), 1);
Expand All @@ -210,7 +210,11 @@ fn scanning_fake_blocks_store_key_and_results() -> Result<()> {
let result = SaplingScannedResult::from(result.transactions()[0].txid.as_ref());

// Add result to database
s.add_sapling_result(key_to_be_stored.clone(), Height(1), vec![result]);
s.add_sapling_results(
key_to_be_stored.clone(),
Height(1),
[(TransactionIndex::from_usize(0), result)].into(),
);

// Check the result was added
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion zebra-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub use service::{
chain_tip::{ChainTipChange, LatestChainTip, TipAction},
check, init, spawn_init,
watch_receiver::WatchReceiver,
OutputIndex, OutputLocation, TransactionLocation,
OutputIndex, OutputLocation, TransactionIndex, TransactionLocation,
};

#[cfg(feature = "shielded-scan")]
Expand Down
Loading
Loading