Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add(scan): Implement Results request #8224

Merged
merged 3 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions zebra-node-services/src/scan_service/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub enum Request {
/// Deletes viewing keys and their results from the database.
DeleteKeys(Vec<String>),

/// TODO: Accept `KeyHash`es and return `Transaction`s
Results(Vec<()>),
/// Accept keys and return transaction data
Results(Vec<String>),

/// TODO: Accept `KeyHash`es and return a channel receiver
SubscribeResults(Vec<()>),
Expand Down
13 changes: 9 additions & 4 deletions zebra-node-services/src/scan_service/response.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
//! `zebra_scan::service::ScanService` response types.

use std::sync::{mpsc, Arc};
use std::{
collections::BTreeMap,
sync::{mpsc, Arc},
};

use zebra_chain::{block::Height, transaction::Transaction};
use zebra_chain::{block::Height, transaction::Hash};

#[derive(Debug)]
/// Response types for `zebra_scan::service::ScanService`
Expand All @@ -14,7 +17,9 @@ pub enum Response {
},

/// Response to Results request
Results(Vec<Transaction>),
///
/// We use the nested `BTreeMap` so we don't repeat any piece of response data.
Results(BTreeMap<String, BTreeMap<Height, Vec<Hash>>>),

/// Response to DeleteKeys request
DeletedKeys,
Expand All @@ -23,5 +28,5 @@ pub enum Response {
ClearedResults,

/// Response to SubscribeResults request
SubscribeResults(mpsc::Receiver<Arc<Transaction>>),
SubscribeResults(mpsc::Receiver<Arc<Hash>>),
}
34 changes: 29 additions & 5 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! [`tower::Service`] for zebra-scan.

use std::{future::Future, pin::Pin, task::Poll, time::Duration};
use std::{collections::BTreeMap, future::Future, pin::Pin, task::Poll, time::Duration};

use futures::future::FutureExt;
use tower::Service;

use zebra_chain::parameters::Network;
use zebra_chain::{parameters::Network, transaction::Hash};

use zebra_state::ChainTipChange;

use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response};
Expand Down Expand Up @@ -124,8 +125,31 @@ impl Service<Request> for ScanService {
.boxed();
}

Request::Results(_key_hashes) => {
// TODO: read results from db
Request::Results(keys) => {
let db = self.db.clone();

return async move {
let mut final_result = BTreeMap::new();
for key in keys {
let db = db.clone();
let mut heights_and_transactions = BTreeMap::new();
let txs = {
let key = key.clone();
tokio::task::spawn_blocking(move || db.sapling_results_for_key(&key))
}
.await?;
txs.iter().for_each(|(k, v)| {
heights_and_transactions
.entry(*k)
.or_insert_with(Vec::new)
.extend(v.iter().map(|x| Hash::from(*x)));
});
final_result.entry(key).or_insert(heights_and_transactions);
}

Ok(Response::Results(final_result))
}
.boxed();
}

Request::SubscribeResults(_key_hashes) => {
Expand All @@ -148,6 +172,6 @@ impl Service<Request> for ScanService {
}
}

async move { Ok(Response::Results(vec![])) }.boxed()
async move { Ok(Response::Results(BTreeMap::new())) }.boxed()
}
}
76 changes: 76 additions & 0 deletions zebra-scan/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,79 @@ pub async fn scan_service_clears_results_correctly() -> Result<()> {

Ok(())
}

/// Tests that results for key are returned correctly
#[tokio::test]
pub async fn scan_service_get_results_for_key_correctly() -> Result<()> {
let mut db = new_test_storage(Network::Mainnet);

let zec_pages_sapling_efvk = ZECPAGES_SAPLING_VIEWING_KEY.to_string();

for fake_result_height in [Height::MIN, Height(1), Height::MAX] {
db.insert_sapling_results(
&zec_pages_sapling_efvk,
fake_result_height,
fake_sapling_results([
TransactionIndex::MIN,
TransactionIndex::from_index(40),
TransactionIndex::MAX,
]),
);
}

assert!(
db.sapling_results(&zec_pages_sapling_efvk).len() == 3,
"there should be 3 heights for this key in the db"
);

for (_height, transactions) in db.sapling_results(&zec_pages_sapling_efvk) {
assert!(
transactions.len() == 3,
"there should be 3 transactions for each height for this key in the db"
);
}

// We don't need to send any command to the scanner for this call.
let (mut scan_service, _cmd_receiver) = ScanService::new_with_mock_scanner(db);

let response_fut = scan_service
.ready()
.await
.map_err(|err| eyre!(err))?
.call(Request::Results(vec![zec_pages_sapling_efvk.clone()]));

match response_fut.await.map_err(|err| eyre!(err))? {
Response::Results(results) => {
assert!(
results.contains_key(&zec_pages_sapling_efvk),
"results should contain the requested key"
);
assert!(results.len() == 1, "values are only for 1 key");

assert!(
results
.get_key_value(&zec_pages_sapling_efvk)
.unwrap()
.1
.len()
== 3,
"we should have 3 heights for the given key "
);

for transactions in results
.get_key_value(&zec_pages_sapling_efvk)
.unwrap()
.1
.values()
{
assert!(
transactions.len() == 3,
"there should be 3 transactions for each height for this key"
);
}
}
_ => panic!("scan service returned unexpected response variant"),
};

Ok(())
}
Loading