Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into issue8205
Browse files Browse the repository at this point in the history
  • Loading branch information
oxarbitrage committed Feb 1, 2024
2 parents 80a429d + 2bf16a3 commit b6d7003
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 31 deletions.
64 changes: 62 additions & 2 deletions zebra-node-services/src/scan_service/request.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
//! `zebra_scan::service::ScanService` request types.

use crate::BoxError;

/// The maximum number of keys that may be included in a request to the scan service
const MAX_REQUEST_KEYS: usize = 1000;

#[derive(Debug)]
/// Request types for `zebra_scan::service::ScanService`
pub enum Request {
Expand All @@ -21,6 +26,61 @@ pub enum Request {
/// TODO: Accept `KeyHash`es and return a channel receiver
SubscribeResults(Vec<()>),

/// TODO: Accept `KeyHash`es and return transaction ids
ClearResults(Vec<()>),
/// Clear the results for a set of viewing keys
ClearResults(Vec<String>),
}

impl Request {
/// Check that the request data is valid for the request variant
pub fn check(&self) -> Result<(), BoxError> {
self.check_num_keys()?;

Ok(())
}

/// Checks that requests which include keys have a valid number of keys.
fn check_num_keys(&self) -> Result<(), BoxError> {
match self {
Request::DeleteKeys(keys) | Request::ClearResults(keys)
if keys.is_empty() || keys.len() > MAX_REQUEST_KEYS =>
{
Err(format!("request must include between 1 and {MAX_REQUEST_KEYS} keys").into())
}

_ => Ok(()),
}
}
}

#[test]
fn test_check_num_keys() {
let fake_keys: Vec<_> = std::iter::repeat(String::new())
.take(MAX_REQUEST_KEYS + 1)
.collect();

let bad_requests = [
Request::DeleteKeys(vec![]),
Request::DeleteKeys(fake_keys.clone()),
Request::ClearResults(vec![]),
Request::ClearResults(fake_keys),
];

let valid_requests = [
Request::DeleteKeys(vec![String::new()]),
Request::ClearResults(vec![String::new()]),
];

for request in bad_requests {
let error = request.check().expect_err("check should return an error");

assert_eq!(
format!("request must include between 1 and {MAX_REQUEST_KEYS} keys"),
error.to_string(),
"check_num_keys should return an error because there are too many keys"
);
}

for request in valid_requests {
request.check().expect("check should return Ok(())");
}
}
3 changes: 3 additions & 0 deletions zebra-node-services/src/scan_service/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub enum Response {
/// Response to DeleteKeys request
DeletedKeys,

/// Response to ClearResults request
ClearedResults,

/// Response to SubscribeResults request
SubscribeResults(mpsc::Receiver<Arc<Hash>>),
}
38 changes: 18 additions & 20 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ pub struct ScanService {
/// A timeout applied to `DeleteKeys` requests.
const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15);

/// The maximum number of keys that may be included in a request to the scan service
const MAX_REQUEST_KEYS: usize = 1000;

impl ScanService {
/// Create a new [`ScanService`].
pub fn new(
Expand Down Expand Up @@ -76,6 +73,10 @@ impl Service<Request> for ScanService {
}

fn call(&mut self, req: Request) -> Self::Future {
if let Err(error) = req.check() {
return async move { Err(error) }.boxed();
}

match req {
Request::Info => {
let db = self.db.clone();
Expand Down Expand Up @@ -103,13 +104,6 @@ impl Service<Request> for ScanService {
let mut scan_task = self.scan_task.clone();

return async move {
if keys.len() > MAX_REQUEST_KEYS {
return Err(format!(
"maximum number of keys per request is {MAX_REQUEST_KEYS}"
)
.into());
}

// Wait for a message to confirm that the scan task has removed the key up to `DELETE_KEY_TIMEOUT`
let remove_keys_result =
tokio::time::timeout(DELETE_KEY_TIMEOUT, scan_task.remove_keys(&keys)?)
Expand All @@ -119,7 +113,7 @@ impl Service<Request> for ScanService {
// Delete the key from the database after either confirmation that it's been removed from the scan task, or
// waiting `DELETE_KEY_TIMEOUT`.
let delete_key_task = tokio::task::spawn_blocking(move || {
db.delete_sapling_results(keys);
db.delete_sapling_keys(keys);
});

// Return timeout errors or `RecvError`s, or wait for the key to be deleted from the database.
Expand All @@ -135,13 +129,6 @@ impl Service<Request> for ScanService {
let db = self.db.clone();

return async move {
if keys.len() > MAX_REQUEST_KEYS {
return Err(format!(
"maximum number of keys per request is {MAX_REQUEST_KEYS}"
)
.into());
}

let mut final_result = BTreeMap::new();
for key in keys {
let db = db.clone();
Expand Down Expand Up @@ -169,8 +156,19 @@ impl Service<Request> for ScanService {
// TODO: send key_hashes and mpsc::Sender to scanner task, return mpsc::Receiver to caller
}

Request::ClearResults(_key_hashes) => {
// TODO: clear results for these keys from db
Request::ClearResults(keys) => {
let mut db = self.db.clone();

return async move {
// Clear results from db for the provided `keys`
tokio::task::spawn_blocking(move || {
db.delete_sapling_results(keys);
})
.await?;

Ok(Response::ClearedResults)
}
.boxed();
}
}

Expand Down
55 changes: 55 additions & 0 deletions zebra-scan/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,61 @@ pub async fn scan_service_deletes_keys_correctly() -> Result<()> {
Ok(())
}

/// Tests that results are cleared are deleted correctly
#[tokio::test]
pub async fn scan_service_clears_results_correctly() -> Result<()> {
let mut db = new_test_storage(Network::Mainnet);

let zec_pages_sapling_efvk = ZECPAGES_SAPLING_VIEWING_KEY.to_string();

for fake_result_height in [Height::MIN, Height(1), Height::MAX] {
db.insert_sapling_results(
&zec_pages_sapling_efvk,
fake_result_height,
fake_sapling_results([
TransactionIndex::MIN,
TransactionIndex::from_index(40),
TransactionIndex::MAX,
]),
);
}

assert!(
!db.sapling_results(&zec_pages_sapling_efvk).is_empty(),
"there should be some results for this key in the db"
);

let (mut scan_service, _cmd_receiver) = ScanService::new_with_mock_scanner(db.clone());

let response = scan_service
.ready()
.await
.map_err(|err| eyre!(err))?
.call(Request::ClearResults(vec![zec_pages_sapling_efvk.clone()]))
.await
.map_err(|err| eyre!(err))?;

match response {
Response::ClearedResults => {}
_ => panic!("scan service returned unexpected response variant"),
};

assert_eq!(
db.sapling_results(&zec_pages_sapling_efvk).len(),
1,
"all results for this key should have been deleted, one empty entry should remain"
);

for (_, result) in db.sapling_results(&zec_pages_sapling_efvk) {
assert!(
result.is_empty(),
"there should be no results for this entry in the db"
);
}

Ok(())
}

/// Tests that results for key are returned correctly
#[tokio::test]
pub async fn scan_service_get_results_for_key_correctly() -> Result<()> {
Expand Down
44 changes: 36 additions & 8 deletions zebra-scan/src/storage/db/sapling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,24 @@ impl Storage {
.expect("unexpected database write failure");
}

/// Delete the sapling keys and their results, if they exist,
pub(crate) fn delete_sapling_keys(&mut self, keys: Vec<SaplingScanningKey>) {
self.sapling_tx_ids_cf()
.new_batch_for_writing()
.delete_sapling_keys(keys)
.write_batch()
.expect("unexpected database write failure");
}

/// Delete the results of sapling scanning `keys`, if they exist
pub(crate) fn delete_sapling_results(&mut self, keys: Vec<SaplingScanningKey>) {
let mut batch = self.sapling_tx_ids_cf().new_batch_for_writing();
let mut batch = self
.sapling_tx_ids_cf()
.new_batch_for_writing()
.delete_sapling_keys(keys.clone());

for key in &keys {
let from = SaplingScannedDatabaseIndex::min_for_key(key);
let until_strictly_before = SaplingScannedDatabaseIndex::max_for_key(key);

batch = batch
.zs_delete_range(&from, &until_strictly_before)
// TODO: convert zs_delete_range() to take std::ops::RangeBounds
.zs_delete(&until_strictly_before);
batch = batch.insert_sapling_height(key, Height::MIN);
}

batch
Expand All @@ -271,3 +277,25 @@ impl<'cf> InsertSaplingHeight for WriteSaplingTxIdsBatch<'cf> {
self.zs_insert(&index, &None)
}
}

/// Utility trait for deleting sapling keys in a WriteSaplingTxIdsBatch.
trait DeleteSaplingKeys {
fn delete_sapling_keys(self, sapling_key: Vec<SaplingScanningKey>) -> Self;
}

impl<'cf> DeleteSaplingKeys for WriteSaplingTxIdsBatch<'cf> {
/// Delete sapling keys and their results.
fn delete_sapling_keys(mut self, sapling_keys: Vec<SaplingScanningKey>) -> Self {
for key in &sapling_keys {
let from_index = SaplingScannedDatabaseIndex::min_for_key(key);
let until_strictly_before_index = SaplingScannedDatabaseIndex::max_for_key(key);

self = self
.zs_delete_range(&from_index, &until_strictly_before_index)
// TODO: convert zs_delete_range() to take std::ops::RangeBounds
.zs_delete(&until_strictly_before_index);
}

self
}
}
68 changes: 67 additions & 1 deletion zebra-scan/src/storage/db/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,77 @@ pub fn deletes_keys_and_results_correctly() {
);
}

db.delete_sapling_results(vec![efvk.clone()]);
db.delete_sapling_keys(vec![efvk.clone()]);

assert!(
db.sapling_results(efvk).is_empty(),
"all results for this key should have been deleted"
);
}
}

/// Tests that keys are deleted correctly
#[test]
pub fn clears_results_correctly() {
let mut db = new_test_storage(Network::Mainnet);

let zec_pages_sapling_efvk = ZECPAGES_SAPLING_VIEWING_KEY.to_string();

// Replace the last letter of the zec_pages efvk
let fake_efvk = format!(
"{}t",
&ZECPAGES_SAPLING_VIEWING_KEY[..ZECPAGES_SAPLING_VIEWING_KEY.len() - 1]
);

let efvks = [&zec_pages_sapling_efvk, &fake_efvk];
let fake_heights = [Height::MIN, Height(1), Height::MAX];
let fake_transaction_indexes = [
TransactionIndex::MIN,
TransactionIndex::from_index(40),
TransactionIndex::MAX,
];

for efvk in efvks {
for fake_result_height in fake_heights {
db.insert_sapling_results(
efvk,
fake_result_height,
fake_sapling_results(fake_transaction_indexes),
);
}
}

let expected_num_entries = fake_heights.len();
let expected_num_results_per_entry = fake_transaction_indexes.len();

for efvk in efvks {
assert_eq!(
db.sapling_results(efvk).len(),
expected_num_entries,
"there should be {expected_num_entries} entries for this key in the db"
);

for (_, result) in db.sapling_results(efvk) {
assert_eq!(
result.len(),
expected_num_results_per_entry,
"there should be {expected_num_results_per_entry} results for this entry in the db"
);
}

db.delete_sapling_results(vec![efvk.clone()]);

assert_eq!(
db.sapling_results(efvk).len(),
1,
"all results for this key should have been deleted, one empty entry should remain"
);

for (_, result) in db.sapling_results(efvk) {
assert!(
result.is_empty(),
"there should be no results for this entry in the db"
);
}
}
}

0 comments on commit b6d7003

Please sign in to comment.