From 84a58747d75aab49e6101d1d1984462eadcfab75 Mon Sep 17 00:00:00 2001 From: Hazel OHearn Date: Mon, 27 Jan 2025 13:50:08 -0400 Subject: [PATCH 01/10] impl ZcashIndexer for StateService --- zaino-state/src/state.rs | 575 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 569 insertions(+), 6 deletions(-) diff --git a/zaino-state/src/state.rs b/zaino-state/src/state.rs index 628199b5..c2c00ce3 100644 --- a/zaino-state/src/state.rs +++ b/zaino-state/src/state.rs @@ -1,15 +1,25 @@ //! Zcash chain fetch and tx submission service backed by Zebras [`ReadStateService`]. use chrono::Utc; -use hex::ToHex; +use futures::FutureExt as _; +use hex::{FromHex as _, ToHex as _}; use indexmap::IndexMap; use std::io::Cursor; +use std::str::FromStr as _; use std::{future::poll_fn, pin::pin}; use tokio::time::timeout; +use tonic::async_trait; use tower::Service; +use zaino_fetch::jsonrpc::response::TxidsResponse; use zaino_proto::proto::service::BlockRange; +use zebra_chain::block::{Height, SerializedBlock}; use zebra_chain::parameters::Network; -use zebra_rpc::methods::GetBlockTransaction; +use zebra_chain::subtree::NoteCommitmentSubtreeIndex; +use zebra_rpc::methods::trees::{GetSubtrees, GetTreestate, SubtreeRpcData}; +use zebra_rpc::methods::{ + AddressBalance, AddressStrings, GetAddressTxIdsRequest, GetAddressUtxos, GetBlockTransaction, + GetRawTransaction, SentTransactionHash, TransactionObject, +}; use zebra_rpc::server::error::LegacyCode; use zebra_chain::{ @@ -26,8 +36,12 @@ use zebra_rpc::{ }, sync::init_read_state_with_syncer, }; -use zebra_state::{ChainTipChange, HashOrHeight, LatestChainTip, ReadStateService}; +use zebra_state::{ + ChainTipChange, HashOrHeight, LatestChainTip, MinedTx, OutputLocation, ReadRequest, + ReadResponse, ReadStateService, TransactionLocation, +}; +use crate::indexer::ZcashIndexer; use crate::{ config::StateServiceConfig, error::StateServiceError, @@ -35,12 +49,23 @@ use crate::{ stream::CompactBlockStream, utils::{get_build_info, ServiceMetadata}, }; -use zaino_fetch::jsonrpc::connector::{test_node_and_return_uri, JsonRpcConnector}; +use zaino_fetch::jsonrpc::connector::{test_node_and_return_uri, JsonRpcConnector, RpcError}; use zaino_proto::proto::compact_formats::{ ChainMetadata, CompactBlock, CompactOrchardAction, CompactSaplingOutput, CompactSaplingSpend, CompactTx, }; +macro_rules! expected_read_response { + ($response:ident, $expected_variant:ident) => { + match $response { + ReadResponse::$expected_variant(inner) => inner, + unexpected => { + unreachable!("Unexpected response from state service: {unexpected:?}") + } + } + }; +} + /// Chain fetch service backed by Zebra's `ReadStateService` and `TrustedChainSync`. /// /// NOTE: We currently dop not implement clone for chain fetch services as this service is responsible for maintaining and closing its child processes. @@ -57,7 +82,7 @@ pub struct StateService { /// Sync task handle. sync_task_handle: Option>, /// JsonRPC Client. - _rpc_client: JsonRpcConnector, + rpc_client: JsonRpcConnector, /// Service metadata. data: ServiceMetadata, /// StateService config data. @@ -105,7 +130,7 @@ impl StateService { latest_chain_tip, _chain_tip_change: chain_tip_change, sync_task_handle: Some(sync_task_handle), - _rpc_client: rpc_client, + rpc_client, data, config, status: AtomicStatus::new(StatusType::Spawning.into()), @@ -193,6 +218,544 @@ impl StateService { } } +#[async_trait] +impl ZcashIndexer for StateService { + type Error = StateServiceError; + + async fn get_info(&self) -> Result { + Ok(GetInfo::from_parts( + self.data.zebra_build(), + self.data.zebra_subversion(), + )) + } + + async fn get_blockchain_info(&self) -> Result { + let response = self.checked_call(ReadRequest::TipPoolValues).await?; + let (height, hash, balance) = match response { + ReadResponse::TipPoolValues { + tip_height, + tip_hash, + value_balance, + } => (tip_height, tip_hash, value_balance), + unexpected => { + unreachable!("Unexpected response from state service: {unexpected:?}") + } + }; + let request = zebra_state::ReadRequest::BlockHeader(hash.into()); + let response = self.checked_call(request).await?; + let header = match response { + ReadResponse::BlockHeader { header, .. } => header, + unexpected => { + unreachable!("Unexpected response from state service: {unexpected:?}") + } + }; + let now = Utc::now(); + let zebra_estimated_height = + NetworkChainTipHeightEstimator::new(header.time, height, &self.config.network) + .estimate_height_at(now); + let estimated_height = if header.time > now || zebra_estimated_height < height { + height + } else { + zebra_estimated_height + }; + let upgrades = IndexMap::from_iter( + self.config + .network + .full_activation_list() + .into_iter() + .filter_map(|(activation_height, network_upgrade)| { + // Zebra defines network upgrades based on incompatible consensus rule changes, + // but zcashd defines them based on ZIPs. + // + // All the network upgrades with a consensus branch ID are the same in Zebra and zcashd. + network_upgrade.branch_id().map(|branch_id| { + // zcashd's RPC seems to ignore Disabled network upgrades, so Zebra does too. + let status = if height >= activation_height { + NetworkUpgradeStatus::Active + } else { + NetworkUpgradeStatus::Pending + }; + + ( + ConsensusBranchIdHex::new(branch_id.into()), + NetworkUpgradeInfo::from_parts( + network_upgrade, + activation_height, + status, + ), + ) + }) + }), + ); + let next_block_height = + (height + 1).expect("valid chain tips are a lot less than Height::MAX"); + let consensus = TipConsensusBranch::from_parts( + ConsensusBranchIdHex::new( + NetworkUpgrade::current(&self.config.network, height) + .branch_id() + .unwrap_or(ConsensusBranchId::RPC_MISSING_ID) + .into(), + ) + .inner(), + ConsensusBranchIdHex::new( + NetworkUpgrade::current(&self.config.network, next_block_height) + .branch_id() + .unwrap_or(ConsensusBranchId::RPC_MISSING_ID) + .into(), + ) + .inner(), + ); + + Ok(GetBlockChainInfo::new( + self.config.network.bip70_network_name(), + height, + hash, + estimated_height, + ValuePoolBalance::from_value_balance(balance), + upgrades, + consensus, + )) + } + + async fn z_get_address_balance( + &self, + address_strings: AddressStrings, + ) -> Result { + let strings_set = address_strings + .valid_addresses() + .map_err(|e| RpcError::new_from_errorobject(e, "invalid taddrs provided"))?; + let response = self + .checked_call(ReadRequest::AddressBalance(strings_set)) + .await?; + let balance = expected_read_response!(response, AddressBalance); + Ok(AddressBalance { + balance: u64::from(balance), + }) + } + + async fn send_raw_transaction( + &self, + raw_transaction_hex: String, + ) -> Result { + // Offload to the json rpc connector, as ReadStateService doesn't yet interface with the mempool + self.rpc_client + .send_raw_transaction(raw_transaction_hex) + .await + .map(SentTransactionHash::from) + .map_err(StateServiceError::JsonRpcConnectorError) + } + + async fn z_get_block( + &self, + hash_or_height_string: String, + verbosity: Option, + ) -> Result { + let verbosity = verbosity.unwrap_or(1); + let hash_or_height = HashOrHeight::from_str(&hash_or_height_string); + match verbosity { + 0 => { + let request = ReadRequest::Block(hash_or_height?); + let response = self.checked_call(request).await?; + let block = expected_read_response!(response, Block); + block.map(SerializedBlock::from).map(GetBlock::Raw).ok_or( + StateServiceError::RpcError(RpcError::new_from_legacycode( + LegacyCode::InvalidParameter, + "block not found", + )), + ) + } + 1 | 2 => { + let hash_or_height = hash_or_height?; + let txids_or_fullblock_request = match verbosity { + 1 => ReadRequest::TransactionIdsForBlock(hash_or_height), + 2 => ReadRequest::Block(hash_or_height), + _ => unreachable!("verbosity is known to be 1 or 2"), + }; + + let (txids_or_fullblock, orchard_tree_response, header) = futures::join!( + self.checked_call(txids_or_fullblock_request), + self.checked_call(ReadRequest::OrchardTree(hash_or_height)), + self.get_block_header(hash_or_height_string, Some(true)) + ); + + let header_obj = match header? { + GetBlockHeader::Raw(_hex_data) => unreachable!( + "`true` was passed to get_block_header, an object should be returned" + ), + GetBlockHeader::Object(get_block_header_object) => get_block_header_object, + }; + let GetBlockHeaderObject { + hash, + confirmations, + height, + version, + merkle_root, + final_sapling_root, + sapling_tree_size, + time, + nonce, + solution, + bits, + difficulty, + previous_block_hash, + next_block_hash, + } = *header_obj; + + let transactions_response: Vec = match txids_or_fullblock { + Ok(ReadResponse::TransactionIdsForBlock(Some(txids))) => Ok(txids + .iter() + .copied() + .map(GetBlockTransaction::Hash) + .collect()), + Ok(ReadResponse::Block(Some(block))) => Ok(block + .transactions + .iter() + .map(|transaction| { + GetBlockTransaction::Object(TransactionObject { + hex: transaction.as_ref().into(), + height: Some(height.0), + // Confirmations should never be greater than the current block height + confirmations: Some(confirmations as u32), + }) + }) + .collect()), + Ok(ReadResponse::TransactionIdsForBlock(None)) + | Ok(ReadResponse::Block(None)) => { + Err(StateServiceError::RpcError(RpcError::new_from_legacycode( + LegacyCode::InvalidParameter, + "block not found", + ))) + } + Ok(unexpected) => { + unreachable!("Unexpected response from state service: {unexpected:?}") + } + Err(e) => Err(e), + }?; + + let orchard_tree_response = orchard_tree_response?; + let orchard_tree = expected_read_response!(orchard_tree_response, OrchardTree) + .ok_or(StateServiceError::RpcError(RpcError::new_from_legacycode( + LegacyCode::Misc, + "missing orchard tree", + )))?; + + let final_orchard_root = + match NetworkUpgrade::Nu5.activation_height(&self.config.network) { + Some(activation_height) if height >= activation_height => { + Some(orchard_tree.root().into()) + } + _otherwise => None, + }; + + let trees = GetBlockTrees::new(sapling_tree_size, orchard_tree.count()); + + Ok(GetBlock::Object { + hash, + confirmations, + height: Some(height), + version: Some(version), + merkle_root: Some(merkle_root), + time: Some(time), + nonce: Some(nonce), + solution: Some(solution), + bits: Some(bits), + difficulty: Some(difficulty), + tx: transactions_response, + trees, + size: None, + final_sapling_root: Some(final_sapling_root), + final_orchard_root, + previous_block_hash: Some(previous_block_hash), + next_block_hash, + }) + } + more_than_two => Err(StateServiceError::RpcError(RpcError::new_from_legacycode( + LegacyCode::InvalidParameter, + format!("invalid verbosity of {more_than_two}"), + ))), + } + } + + async fn get_raw_mempool(&self) -> Result, Self::Error> { + let txids = self.rpc_client.get_raw_mempool().await?; + Ok(txids.transactions) + } + + async fn z_get_treestate(&self, hash_or_height: String) -> Result { + let hash_or_height = HashOrHeight::from_str(&hash_or_height)?; + let block_header_response = self + .checked_call(ReadRequest::BlockHeader(hash_or_height)) + .await?; + let (header, hash, height) = match block_header_response { + ReadResponse::BlockHeader { + header, + hash, + height, + .. + } => (header, hash, height), + unexpected => { + unreachable!("Unexpected response from state service: {unexpected:?}") + } + }; + + let sapling = match NetworkUpgrade::Sapling.activation_height(&self.config.network) { + Some(activation_height) if height >= activation_height => Some( + self.checked_call(ReadRequest::SaplingTree(hash_or_height)) + .await?, + ), + _ => None, + } + .map(|sap_response| { + expected_read_response!(sap_response, SaplingTree).map(|tree| tree.to_rpc_bytes()) + }) + .flatten(); + let orchard = match NetworkUpgrade::Nu5.activation_height(&self.config.network) { + Some(activation_height) if height >= activation_height => Some( + self.checked_call(ReadRequest::OrchardTree(hash_or_height)) + .await?, + ), + _ => None, + } + .map(|orch_response| { + expected_read_response!(orch_response, OrchardTree).map(|tree| tree.to_rpc_bytes()) + }) + .flatten(); + Ok(GetTreestate::from_parts( + hash, + height, + // If the timestamp is pre-unix epoch, something has gone terribly wrong + u32::try_from(header.time.timestamp()).unwrap(), + sapling, + orchard, + )) + } + + async fn z_get_subtrees_by_index( + &self, + pool: String, + start_index: NoteCommitmentSubtreeIndex, + limit: Option, + ) -> Result { + match pool.as_str() { + "sapling" => { + let request = zebra_state::ReadRequest::SaplingSubtrees { start_index, limit }; + let response = self.checked_call(request).await?; + let sapling_subtrees = expected_read_response!(response, SaplingSubtrees); + let subtrees = sapling_subtrees + .values() + .map(|subtree| SubtreeRpcData { + root: subtree.root.encode_hex(), + end_height: subtree.end_height, + }) + .collect(); + Ok(GetSubtrees { + pool, + start_index, + subtrees, + }) + } + "orchard" => { + let request = zebra_state::ReadRequest::OrchardSubtrees { start_index, limit }; + let response = self.checked_call(request).await?; + let orchard_subtrees = expected_read_response!(response, OrchardSubtrees); + let subtrees = orchard_subtrees + .values() + .map(|subtree| SubtreeRpcData { + root: subtree.root.encode_hex(), + end_height: subtree.end_height, + }) + .collect(); + Ok(GetSubtrees { + pool, + start_index, + subtrees, + }) + } + otherwise => Err(StateServiceError::RpcError(RpcError::new_from_legacycode( + LegacyCode::Misc, + format!("invalid pool name \"{otherwise}\", must be \"sapling\" or \"orchard\""), + ))), + } + } + + async fn get_raw_transaction( + &self, + txid_hex: String, + verbose: Option, + ) -> Result { + let txid = zebra_chain::transaction::Hash::from_hex(txid_hex).map_err(|e| { + RpcError::new_from_legacycode(LegacyCode::InvalidAddressOrKey, e.to_string()) + })?; + + // check the mempool for the transaction + let mempool_transaction_future = self.rpc_client.get_raw_mempool().then(|result| async { + result.map(|TxidsResponse { transactions }| { + transactions + .into_iter() + .find(|mempool_txid| *mempool_txid == txid.to_string()) + }) + }); + let onchain_transaction_future = self.checked_call(ReadRequest::Transaction(txid)); + + futures::pin_mut!(mempool_transaction_future); + futures::pin_mut!(onchain_transaction_future); + + // This might be overengineered...try to find the txid on chain and in the mempool, + // whichever one resolves first is tried first. + let resolution = + futures::future::select(mempool_transaction_future, onchain_transaction_future).await; + + let handle_mempool = |txid| async { + self.rpc_client + .get_raw_transaction(txid, verbose) + .await + .map(|gtr| GetRawTransaction::from(gtr)) + .map_err(StateServiceError::JsonRpcConnectorError) + }; + + let handle_onchain = |response| { + let transaction = expected_read_response!(response, Transaction); + match transaction { + Some(MinedTx { + tx, + height, + confirmations, + }) => Ok(match verbose { + Some(_verbosity) => GetRawTransaction::Object(TransactionObject { + hex: tx.into(), + height: Some(height.0), + confirmations: Some(confirmations), + }), + None => GetRawTransaction::Raw(tx.into()), + }), + None => Err(StateServiceError::RpcError(RpcError::new_from_legacycode( + LegacyCode::InvalidAddressOrKey, + "No such mempool or main chain transaction", + ))), + } + }; + + match resolution { + futures::future::Either::Left((response, other_fut)) => match response? { + Some(txid) => handle_mempool(txid).await, + None => { + let response = other_fut.await?; + handle_onchain(response) + } + }, + futures::future::Either::Right((response, other_fut)) => { + match handle_onchain(response?) { + Ok(val) => Ok(val), + Err(e) => match other_fut.await? { + Some(txid) => handle_mempool(txid).await, + None => Err(e), + }, + } + } + } + } + + async fn get_address_tx_ids( + &self, + request: GetAddressTxIdsRequest, + ) -> Result, Self::Error> { + let (addresses, start, end) = request.into_parts(); + let chain_height = + self.latest_chain_tip + .best_tip_height() + .ok_or(RpcError::new_from_legacycode( + LegacyCode::Misc, + "no blocks in chain", + ))?; + + let mut error_string = None; + if start == 0 || end == 0 { + error_string = Some(format!( + "start {start:?} and end {end:?} must both be greater than zero" + )); + } + if start > end { + error_string = Some(format!( + "start {start:?} must be less than or equal to end {end:?}" + )); + } + if Height(start) > chain_height || Height(end) > chain_height { + error_string = Some(format!( + "start {start:?} and end {end:?} must both be less than or equal to the chain tip {chain_height:?}") + ); + } + + if let Some(e) = error_string { + return Err(StateServiceError::RpcError(RpcError::new_from_legacycode( + LegacyCode::InvalidParameter, + e, + ))); + } + + let request = ReadRequest::TransactionIdsByAddresses { + addresses: AddressStrings::new_valid(addresses) + .and_then(|addrs| addrs.valid_addresses()) + .map_err(|e| RpcError::new_from_errorobject(e, "invalid adddress"))?, + + height_range: Height(start)..=Height(end), + }; + + let response = self.checked_call(request).await?; + + let hashes = expected_read_response!(response, AddressesTransactionIds); + + let mut last_tx_location = TransactionLocation::from_usize(Height(0), 0); + + Ok(hashes + .iter() + .map(|(tx_loc, tx_id)| { + // Check that the returned transactions are in chain order. + assert!( + *tx_loc > last_tx_location, + "Transactions were not in chain order:\n\ + {tx_loc:?} {tx_id:?} was after:\n\ + {last_tx_location:?}", + ); + + last_tx_location = *tx_loc; + + tx_id.to_string() + }) + .collect()) + } + + async fn z_get_address_utxos( + &self, + address_strings: AddressStrings, + ) -> Result, Self::Error> { + let valid_addresses = address_strings + .valid_addresses() + .map_err(|e| RpcError::new_from_errorobject(e, "invalid address"))?; + let request = ReadRequest::UtxosByAddresses(valid_addresses); + let response = self.checked_call(request).await?; + let utxos = expected_read_response!(response, AddressUtxos); + let mut last_output_location = OutputLocation::from_usize(Height(0), 0, 0); + Ok(utxos + .utxos() + .map(|utxo| { + assert!(utxo.2 > &last_output_location); + last_output_location = *utxo.2; + // What an odd argument order for from_parts + // at least they are all different types, so they can't be + // supplied in the wrong order + GetAddressUtxos::from_parts( + utxo.0, + *utxo.1, + utxo.2.output_index(), + utxo.3.lock_script.clone(), + u64::from(utxo.3.value()), + utxo.2.height(), + ) + }) + .collect()) + } +} + impl Drop for StateService { fn drop(&mut self) { if let Some(handle) = self.sync_task_handle.take() { From 0bfa0684fc6644d134d8b7cd3013e60cd9135800 Mon Sep 17 00:00:00 2001 From: Hazel OHearn Date: Mon, 3 Feb 2025 14:08:35 -0400 Subject: [PATCH 02/10] clippy fix --- zaino-state/src/fetch.rs | 10 +++++----- zaino-state/src/state.rs | 12 +++++------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/zaino-state/src/fetch.rs b/zaino-state/src/fetch.rs index 3a388ca8..3ec64bf2 100644 --- a/zaino-state/src/fetch.rs +++ b/zaino-state/src/fetch.rs @@ -1948,7 +1948,7 @@ mod tests { .unwrap(); dbg!(recipient_balance.clone()); - dbg!(fetch_service_balance.clone()); + dbg!(fetch_service_balance); assert_eq!(recipient_balance.transparent_balance.unwrap(), 250_000,); assert_eq!( @@ -2619,13 +2619,13 @@ mod tests { let mut sorted_fetch_mempool_tx = fetch_mempool_tx.clone(); sorted_fetch_mempool_tx.sort_by_key(|tx| tx.hash.clone()); - let mut tx1_bytes = tx_1.first().as_ref().clone(); + let mut tx1_bytes = *tx_1.first().as_ref(); tx1_bytes.reverse(); - let mut tx2_bytes = tx_2.first().as_ref().clone(); + let mut tx2_bytes = *tx_2.first().as_ref(); tx2_bytes.reverse(); - let mut sorted_txids = vec![tx1_bytes, tx2_bytes]; - sorted_txids.sort_by_key(|hash| hash.clone()); + let mut sorted_txids = [tx1_bytes, tx2_bytes]; + sorted_txids.sort_by_key(|hash| *hash); assert_eq!(sorted_fetch_mempool_tx[0].hash, sorted_txids[0]); assert_eq!(sorted_fetch_mempool_tx[1].hash, sorted_txids[1]); diff --git a/zaino-state/src/state.rs b/zaino-state/src/state.rs index 0a5010d6..b26da546 100644 --- a/zaino-state/src/state.rs +++ b/zaino-state/src/state.rs @@ -505,10 +505,9 @@ impl ZcashIndexer for StateService { ), _ => None, } - .map(|sap_response| { + .and_then(|sap_response| { expected_read_response!(sap_response, SaplingTree).map(|tree| tree.to_rpc_bytes()) - }) - .flatten(); + }); let orchard = match NetworkUpgrade::Nu5.activation_height(&self.config.network) { Some(activation_height) if height >= activation_height => Some( self.checked_call(ReadRequest::OrchardTree(hash_or_height)) @@ -516,10 +515,9 @@ impl ZcashIndexer for StateService { ), _ => None, } - .map(|orch_response| { + .and_then(|orch_response| { expected_read_response!(orch_response, OrchardTree).map(|tree| tree.to_rpc_bytes()) - }) - .flatten(); + }); Ok(GetTreestate::from_parts( hash, height, @@ -609,7 +607,7 @@ impl ZcashIndexer for StateService { self.rpc_client .get_raw_transaction(txid, verbose) .await - .map(|gtr| GetRawTransaction::from(gtr)) + .map(GetRawTransaction::from) .map_err(StateServiceError::JsonRpcConnectorError) }; From a4f9c1f30e89a7c1da0fe82c0decc83f964318a1 Mon Sep 17 00:00:00 2001 From: Hazel OHearn Date: Mon, 3 Feb 2025 14:10:01 -0400 Subject: [PATCH 03/10] fix sort assertion --- zaino-state/src/fetch.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/zaino-state/src/fetch.rs b/zaino-state/src/fetch.rs index 3ec64bf2..c5312953 100644 --- a/zaino-state/src/fetch.rs +++ b/zaino-state/src/fetch.rs @@ -2048,7 +2048,9 @@ mod tests { dbg!(&fetch_service_mempool); dbg!(&json_service_mempool); - assert_eq!(json_service_mempool.sort(), fetch_service_mempool.sort()); + json_service_mempool.sort(); + fetch_service_mempool.sort(); + assert_eq!(json_service_mempool, fetch_service_mempool); test_manager.close().await; } From 58118b8bb0c0e3945644b470a8b9314cd8e32012 Mon Sep 17 00:00:00 2001 From: Hazel OHearn Date: Mon, 3 Feb 2025 16:16:55 -0400 Subject: [PATCH 04/10] test fiddling, add a test that doesn't work yet --- zaino-state/src/state.rs | 238 +++++++++++++-------------------------- 1 file changed, 79 insertions(+), 159 deletions(-) diff --git a/zaino-state/src/state.rs b/zaino-state/src/state.rs index b26da546..91a00bf3 100644 --- a/zaino-state/src/state.rs +++ b/zaino-state/src/state.rs @@ -1629,6 +1629,46 @@ mod tests { use zaino_testutils::{TestManager, ZEBRAD_CHAIN_CACHE_BIN, ZEBRAD_TESTNET_CACHE_BIN}; use zebra_chain::parameters::Network; use zingo_infra_services::validator::Validator; + async fn create_test_manager_and_state_service( + enable_zaino: bool, + zaino_no_sync: bool, + zaino_no_db: bool, + enable_clients: bool, + ) -> (TestManager, StateService) { + let test_manager = TestManager::launch( + "zebrad", + Some(zingo_infra_services::network::Network::Testnet), + ZEBRAD_TESTNET_CACHE_BIN.clone(), + enable_zaino, + zaino_no_sync, + zaino_no_db, + enable_clients, + ) + .await + .unwrap(); + + let state_service = StateService::spawn(StateServiceConfig::new( + zebra_state::Config { + cache_dir: test_manager.data_dir.clone(), + ephemeral: false, + delete_old_database: true, + debug_stop_at_height: None, + debug_validity_check_interval: None, + }, + SocketAddr::new( + std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), + test_manager.zebrad_rpc_listen_port, + ), + None, + None, + None, + None, + Network::new_default_testnet(), + )) + .await + .unwrap(); + (test_manager, state_service) + } #[tokio::test] async fn launch_state_regtest_service_no_cache() { @@ -1947,38 +1987,8 @@ mod tests { #[tokio::test] async fn state_service_regtest_get_block_raw() { - let mut test_manager = TestManager::launch( - "zebrad", - None, - ZEBRAD_CHAIN_CACHE_BIN.clone(), - false, - true, - true, - false, - ) - .await - .unwrap(); - - let state_service = StateService::spawn(StateServiceConfig::new( - zebra_state::Config { - cache_dir: test_manager.data_dir.clone(), - ephemeral: false, - delete_old_database: true, - debug_stop_at_height: None, - debug_validity_check_interval: None, - }, - SocketAddr::new( - std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), - test_manager.zebrad_rpc_listen_port, - ), - None, - None, - None, - None, - Network::new_regtest(Some(1), Some(1)), - )) - .await - .unwrap(); + let (mut test_manager, state_service) = + create_test_manager_and_state_service(false, true, true, false).await; let fetch_service = zaino_fetch::jsonrpc::connector::JsonRpcConnector::new( url::Url::parse(&format!( "http://127.0.0.1:{}", @@ -2020,38 +2030,8 @@ mod tests { #[tokio::test] async fn state_service_regtest_get_block_object() { - let mut test_manager = TestManager::launch( - "zebrad", - None, - ZEBRAD_CHAIN_CACHE_BIN.clone(), - false, - true, - true, - false, - ) - .await - .unwrap(); - - let state_service = StateService::spawn(StateServiceConfig::new( - zebra_state::Config { - cache_dir: test_manager.data_dir.clone(), - ephemeral: false, - delete_old_database: true, - debug_stop_at_height: None, - debug_validity_check_interval: None, - }, - SocketAddr::new( - std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), - test_manager.zebrad_rpc_listen_port, - ), - None, - None, - None, - None, - Network::new_regtest(Some(1), Some(1)), - )) - .await - .unwrap(); + let (mut test_manager, state_service) = + create_test_manager_and_state_service(false, true, true, false).await; let fetch_service = zaino_fetch::jsonrpc::connector::JsonRpcConnector::new( url::Url::parse(&format!( "http://127.0.0.1:{}", @@ -2124,38 +2104,8 @@ mod tests { /// WARNING: This tests needs refactoring due to code removed in zaino-state. #[tokio::test] async fn state_service_regtest_get_block_compact() { - let mut test_manager = TestManager::launch( - "zebrad", - None, - ZEBRAD_CHAIN_CACHE_BIN.clone(), - false, - true, - true, - false, - ) - .await - .unwrap(); - - let state_service = StateService::spawn(StateServiceConfig::new( - zebra_state::Config { - cache_dir: test_manager.data_dir.clone(), - ephemeral: false, - delete_old_database: true, - debug_stop_at_height: None, - debug_validity_check_interval: None, - }, - SocketAddr::new( - std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), - test_manager.zebrad_rpc_listen_port, - ), - None, - None, - None, - None, - Network::new_regtest(Some(1), Some(1)), - )) - .await - .unwrap(); + let (mut test_manager, state_service) = + create_test_manager_and_state_service(false, true, true, false).await; let state_start = tokio::time::Instant::now(); let state_service_get_compact_block = StateService::get_compact_block( @@ -2180,17 +2130,8 @@ mod tests { /// WARNING: This tests needs refactoring due to code removed in zaino-state. #[tokio::test] async fn state_service_regtest_get_block_range() { - let mut test_manager = TestManager::launch( - "zebrad", - None, - ZEBRAD_CHAIN_CACHE_BIN.clone(), - false, - true, - true, - false, - ) - .await - .unwrap(); + let (mut test_manager, state_service) = + create_test_manager_and_state_service(false, true, true, false).await; let block_range = BlockRange { start: Some(BlockId { height: 50, @@ -2201,26 +2142,6 @@ mod tests { hash: Vec::new(), }), }; - let state_service = StateService::spawn(StateServiceConfig::new( - zebra_state::Config { - cache_dir: test_manager.data_dir.clone(), - ephemeral: false, - delete_old_database: true, - debug_stop_at_height: None, - debug_validity_check_interval: None, - }, - SocketAddr::new( - std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), - test_manager.zebrad_rpc_listen_port, - ), - None, - None, - None, - None, - Network::new_regtest(Some(1), Some(1)), - )) - .await - .unwrap(); let state_start = tokio::time::Instant::now(); let state_service_stream = state_service @@ -2248,17 +2169,8 @@ mod tests { #[tokio::test] async fn state_service_testnet_get_block_range_large() { - let mut test_manager = TestManager::launch( - "zebrad", - Some(zingo_infra_services::network::Network::Testnet), - ZEBRAD_TESTNET_CACHE_BIN.clone(), - false, - true, - true, - false, - ) - .await - .unwrap(); + let (mut test_manager, state_service) = + create_test_manager_and_state_service(false, true, true, false).await; let block_range = BlockRange { start: Some(BlockId { @@ -2271,27 +2183,6 @@ mod tests { }), }; - let state_service = StateService::spawn(StateServiceConfig::new( - zebra_state::Config { - cache_dir: test_manager.data_dir.clone(), - ephemeral: false, - delete_old_database: true, - debug_stop_at_height: None, - debug_validity_check_interval: None, - }, - SocketAddr::new( - std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), - test_manager.zebrad_rpc_listen_port, - ), - None, - None, - None, - None, - Network::new_default_testnet(), - )) - .await - .unwrap(); - let num_blocks = block_range.clone().end.unwrap().height - block_range.clone().start.unwrap().height; println!("Fetching {} blocks in range: {:?}", num_blocks, block_range); @@ -2313,6 +2204,35 @@ mod tests { println!("Last block in range: {:?}", state_blocks.last()); println!("GetBlockRange response received. State-Service fetch 1,000,000 blocks in processing time: {:?}.", state_service_duration); + test_manager.close().await; + } + #[tokio::test] + async fn state_service_testnet_get_blockchain_info() { + let (mut test_manager, state_service) = + create_test_manager_and_state_service(false, true, true, false).await; + + let fetch_service = zaino_fetch::jsonrpc::connector::JsonRpcConnector::new( + url::Url::parse(&format!( + "http://127.0.0.1:{}", + test_manager.zebrad_rpc_listen_port + )) + .expect("Failed to construct URL") + .as_str() + .try_into() + .expect("Failed to convert URL to URI"), + Some("xxxxxx".to_string()), + Some("xxxxxx".to_string()), + ) + .await + .unwrap(); + let state_service_bcinfo = state_service.get_blockchain_info().await.unwrap(); + let fetch_service_bcinfo = fetch_service.get_blockchain_info().await.unwrap(); + + dbg!(state_service_bcinfo); + dbg!(fetch_service_bcinfo); + + // assert_eq!(state_service_bcinfo, fetch_service_bcinfo); + test_manager.close().await; } } From da19187d3752f660747d3100e06411267a7f510b Mon Sep 17 00:00:00 2001 From: Hazel OHearn Date: Thu, 6 Feb 2025 14:35:32 -0400 Subject: [PATCH 05/10] removed unused code, import juggling --- zaino-state/src/state.rs | 375 ++++----------------------------------- 1 file changed, 30 insertions(+), 345 deletions(-) diff --git a/zaino-state/src/state.rs b/zaino-state/src/state.rs index 91a00bf3..e69af8a9 100644 --- a/zaino-state/src/state.rs +++ b/zaino-state/src/state.rs @@ -1,39 +1,53 @@ //! Zcash chain fetch and tx submission service backed by Zebras [`ReadStateService`]. +use crate::{ + config::StateServiceConfig, + error::StateServiceError, + indexer::ZcashIndexer, + status::{AtomicStatus, StatusType}, + stream::CompactBlockStream, + utils::{get_build_info, ServiceMetadata}, +}; + use chrono::Utc; use futures::FutureExt as _; use hex::{FromHex as _, ToHex as _}; use indexmap::IndexMap; +use std::future::poll_fn; use std::io::Cursor; use std::str::FromStr as _; -use std::{future::poll_fn, pin::pin}; use tokio::time::timeout; use tonic::async_trait; use tower::Service; + +use zaino_fetch::jsonrpc::connector::{test_node_and_return_uri, JsonRpcConnector, RpcError}; use zaino_fetch::jsonrpc::response::TxidsResponse; -use zaino_proto::proto::service::BlockRange; -use zebra_chain::block::{Height, SerializedBlock}; -use zebra_chain::parameters::Network; -use zebra_chain::subtree::NoteCommitmentSubtreeIndex; -use zebra_rpc::methods::trees::{GetSubtrees, GetTreestate, SubtreeRpcData}; -use zebra_rpc::methods::{ - AddressBalance, AddressStrings, GetAddressTxIdsRequest, GetAddressUtxos, GetBlockTransaction, - GetRawTransaction, SentTransactionHash, TransactionObject, +use zaino_proto::proto::compact_formats::{ + ChainMetadata, CompactBlock, CompactOrchardAction, CompactSaplingOutput, CompactSaplingSpend, + CompactTx, }; -use zebra_rpc::server::error::LegacyCode; +use zaino_proto::proto::service::BlockRange; use zebra_chain::{ + block::{Height, SerializedBlock}, chain_tip::{ChainTip, NetworkChainTipHeightEstimator}, - parameters::{ConsensusBranchId, NetworkUpgrade}, + parameters::{ConsensusBranchId, Network, NetworkUpgrade}, serialization::{ZcashDeserialize, ZcashSerialize}, + subtree::NoteCommitmentSubtreeIndex, transaction::Transaction, }; use zebra_rpc::{ methods::{ - hex_data::HexData, types::ValuePoolBalance, ConsensusBranchIdHex, GetBlock, - GetBlockChainInfo, GetBlockHash, GetBlockHeader, GetBlockHeaderObject, GetBlockTrees, - GetInfo, NetworkUpgradeInfo, NetworkUpgradeStatus, TipConsensusBranch, + hex_data::HexData, + trees::{GetSubtrees, GetTreestate, SubtreeRpcData}, + types::ValuePoolBalance, + AddressBalance, AddressStrings, ConsensusBranchIdHex, GetAddressTxIdsRequest, + GetAddressUtxos, GetBlock, GetBlockChainInfo, GetBlockHash, GetBlockHeader, + GetBlockHeaderObject, GetBlockTransaction, GetBlockTrees, GetInfo, GetRawTransaction, + NetworkUpgradeInfo, NetworkUpgradeStatus, SentTransactionHash, TipConsensusBranch, + TransactionObject, }, + server::error::LegacyCode, sync::init_read_state_with_syncer, }; use zebra_state::{ @@ -41,20 +55,6 @@ use zebra_state::{ ReadResponse, ReadStateService, TransactionLocation, }; -use crate::indexer::ZcashIndexer; -use crate::{ - config::StateServiceConfig, - error::StateServiceError, - status::{AtomicStatus, StatusType}, - stream::CompactBlockStream, - utils::{get_build_info, ServiceMetadata}, -}; -use zaino_fetch::jsonrpc::connector::{test_node_and_return_uri, JsonRpcConnector, RpcError}; -use zaino_proto::proto::compact_formats::{ - ChainMetadata, CompactBlock, CompactOrchardAction, CompactSaplingOutput, CompactSaplingSpend, - CompactTx, -}; - macro_rules! expected_read_response { ($response:ident, $expected_variant:ident) => { match $response { @@ -768,321 +768,6 @@ impl Drop for StateService { /// /// TODO: Update this to be `impl ZcashIndexer for StateService` once rpc methods are implemented and tested (or implement separately). impl StateService { - /// Returns software information from the RPC server, as a [`GetInfo`] JSON struct. - /// - /// zcashd reference: [`getinfo`](https://zcash.github.io/rpc/getinfo.html) - /// method: post - /// tags: control - /// - /// # Notes - /// - /// [The zcashd reference](https://zcash.github.io/rpc/getinfo.html) might not show some fields - /// in Zebra's [`GetInfo`]. Zebra uses the field names and formats from the - /// [zcashd code](https://github.com/zcash/zcash/blob/v4.6.0-1/src/rpc/misc.cpp#L86-L87). - /// - /// Some fields from the zcashd reference are missing from Zebra's [`GetInfo`]. It only contains the fields - /// [required for lightwalletd support.](https://github.com/zcash/lightwalletd/blob/v0.4.9/common/common.go#L91-L95) - pub async fn get_info(&self) -> Result { - Ok(GetInfo::from_parts( - self.data.zebra_build(), - self.data.zebra_subversion(), - )) - } - - /// Returns blockchain state information, as a [`GetBlockChainInfo`] JSON struct. - /// - /// zcashd reference: [`getblockchaininfo`](https://zcash.github.io/rpc/getblockchaininfo.html) - /// method: post - /// tags: blockchain - /// - /// # Notes - /// - /// Some fields from the zcashd reference are missing from Zebra's [`GetBlockChainInfo`]. It only contains the fields - /// [required for lightwalletd support.](https://github.com/zcash/lightwalletd/blob/v0.4.9/common/common.go#L72-L89) - pub async fn get_blockchain_info(&self) -> Result { - let network = self.data.network(); - let chain = network.bip70_network_name(); - - // Fetch Pool Values - let pool_values = self - .checked_call(zebra_state::ReadRequest::TipPoolValues) - .await?; - let zebra_state::ReadResponse::TipPoolValues { - tip_height, - tip_hash, - value_balance, - } = pool_values - else { - return Err(StateServiceError::Custom( - "Unexpected response type for TipPoolValues".into(), - )); - }; - - // Calculate Estimated height - let block_header = self - .checked_call(zebra_state::ReadRequest::BlockHeader(tip_hash.into())) - .await?; - let zebra_state::ReadResponse::BlockHeader { header, .. } = block_header else { - return Err(StateServiceError::Custom( - "Unexpected response type for BlockHeader".into(), - )); - }; - let tip_block_time = header.time; - let now = Utc::now(); - let zebra_estimated_height = - NetworkChainTipHeightEstimator::new(tip_block_time, tip_height, &network) - .estimate_height_at(now); - let estimated_height = if tip_block_time > now || zebra_estimated_height < tip_height { - tip_height - } else { - zebra_estimated_height - }; - - // Create `upgrades` object - // - // Get the network upgrades in height order, like `zebra` `zcashd`. - let mut upgrades = IndexMap::new(); - for (activation_height, network_upgrade) in network.full_activation_list() { - // Zebra defines network upgrades based on incompatible consensus rule changes, - // but zcashd defines them based on ZIPs. - // - // All the network upgrades with a consensus branch ID are the same in Zebra and zcashd. - if let Some(branch_id) = network_upgrade.branch_id() { - // zcashd's RPC seems to ignore Disabled network upgrades, so Zaino does too. - let status = if tip_height >= activation_height { - NetworkUpgradeStatus::Active - } else { - NetworkUpgradeStatus::Pending - }; - - let upgrade = - NetworkUpgradeInfo::from_parts(network_upgrade, activation_height, status); - upgrades.insert(ConsensusBranchIdHex::new(branch_id.into()), upgrade); - } - } - - // Create `consensus` object - let next_block_height = - (tip_height + 1).expect("valid chain tips are a lot less than Height::MAX"); - let consensus = TipConsensusBranch::from_parts( - NetworkUpgrade::current(&network, tip_height) - .branch_id() - .unwrap_or(ConsensusBranchId::RPC_MISSING_ID) - .into(), - NetworkUpgrade::current(&network, next_block_height) - .branch_id() - .unwrap_or(ConsensusBranchId::RPC_MISSING_ID) - .into(), - ); - - let response = GetBlockChainInfo::new( - chain, - tip_height, - tip_hash, - estimated_height, - ValuePoolBalance::from_value_balance(value_balance), - upgrades, - consensus, - ); - - Ok(response) - } - - /// Returns the requested block by hash or height, as a [`GetBlock`] JSON string. - /// If the block is not in Zebra's state, returns - /// [error code `-8`.](https://github.com/zcash/zcash/issues/5758) if a height was - /// passed or -5 if a hash was passed. - /// - /// zcashd reference: [`getblock`](https://zcash.github.io/rpc/getblock.html) - /// method: post - /// tags: blockchain - /// - /// # Parameters - /// - /// - `hash_or_height`: (string, required, example="1") The hash or height for the block to be returned. - /// - `verbosity`: (number, optional, default=1, example=1) 0 for hex encoded data, 1 for a json object, and 2 for json object with transaction data. - /// - /// # Notes - /// - /// Zebra previously partially supported verbosity=1 by returning only the - /// fields required by lightwalletd ([`lightwalletd` only reads the `tx` - /// field of the result](https://github.com/zcash/lightwalletd/blob/dfac02093d85fb31fb9a8475b884dd6abca966c7/common/common.go#L152)). - /// That verbosity level was migrated to "3"; so while lightwalletd will - /// still work by using verbosity=1, it will sync faster if it is changed to - /// use verbosity=3. - /// - /// The undocumented `chainwork` field is not returned. - pub async fn get_block( - &self, - hash_or_height: String, - verbosity: Option, - ) -> Result { - // From - const DEFAULT_GETBLOCK_VERBOSITY: u8 = 1; - - let verbosity = verbosity.unwrap_or(DEFAULT_GETBLOCK_VERBOSITY); - let network = self.data.network().clone(); - let original_hash_or_height = hash_or_height.clone(); - - // If verbosity requires a call to `get_block_header`, resolve it here - let get_block_header_future = if matches!(verbosity, 1 | 2) { - Some(self.get_block_header(original_hash_or_height.clone(), Some(true))) - } else { - None - }; - - let hash_or_height: HashOrHeight = hash_or_height.parse()?; - - if verbosity == 0 { - // # Performance - // - // This RPC is used in `lightwalletd`'s initial sync of 2 million blocks, - // so it needs to load block data very efficiently. - match self - .checked_call(zebra_state::ReadRequest::Block(hash_or_height)) - .await? - { - zebra_state::ReadResponse::Block(Some(block)) => Ok(GetBlock::Raw(block.into())), - zebra_state::ReadResponse::Block(None) => Err(StateServiceError::RpcError( - zaino_fetch::jsonrpc::connector::RpcError { - code: LegacyCode::InvalidParameter as i64, - message: "Block not found".to_string(), - data: None, - }, - )), - _ => unreachable!("unmatched response to a block request"), - } - } else if let Some(get_block_header_future) = get_block_header_future { - let GetBlockHeader::Object(block_header) = get_block_header_future.await? else { - return Err(StateServiceError::Custom( - "Unexpected response type for BlockHeader".into(), - )); - }; - let GetBlockHeaderObject { - hash, - confirmations, - height, - version, - merkle_root, - final_sapling_root, - sapling_tree_size, - time, - nonce, - solution, - bits, - difficulty, - previous_block_hash, - next_block_hash, - } = *block_header; - - // # Concurrency - // - // We look up by block hash so the hash, transaction IDs, and confirmations - // are consistent. - let hash_or_height = hash.0.into(); - - let mut txids_future = pin!(self.checked_call( - zebra_state::ReadRequest::TransactionIdsForBlock(hash_or_height) - )); - let mut orchard_tree_future = - pin!(self.checked_call(zebra_state::ReadRequest::OrchardTree(hash_or_height))); - - let mut txids = None; - let mut orchard_trees = None; - let mut final_orchard_root = None; - - while txids.is_none() || orchard_trees.is_none() { - tokio::select! { - response = &mut txids_future, if txids.is_none() => { - let tx_ids_response = response?; - let tx_ids = match tx_ids_response { - zebra_state::ReadResponse::TransactionIdsForBlock(Some(tx_ids)) => tx_ids, - zebra_state::ReadResponse::TransactionIdsForBlock(None) => { - return Err(StateServiceError::RpcError(zaino_fetch::jsonrpc::connector::RpcError { - code: LegacyCode::InvalidParameter as i64, - message: "Block not found".to_string(), - data: None, - })); - } - _ => unreachable!("Unexpected response type for TransactionIdsForBlock"), - }; - - txids = Some(tx_ids.iter().map(|tx_id| tx_id.encode_hex()).collect::>()); - } - response = &mut orchard_tree_future, if orchard_trees.is_none() => { - let orchard_tree_response = response?; - let orchard_tree = match orchard_tree_response { - zebra_state::ReadResponse::OrchardTree(Some(tree)) => tree, - zebra_state::ReadResponse::OrchardTree(None) => { - return Err(StateServiceError::RpcError(zaino_fetch::jsonrpc::connector::RpcError { - code: LegacyCode::InvalidParameter as i64, - message: "Missing orchard tree for block.".to_string(), - data: None, - })); - } - _ => unreachable!("Unexpected response type for OrchardTree"), - }; - - let orchard_tree_size = orchard_tree.count(); - let nu5_activation = NetworkUpgrade::Nu5.activation_height(&network); - - - // --- - - final_orchard_root = match nu5_activation { - Some(activation_height) if height >= activation_height => { - Some(orchard_tree.root().into()) - } - _ => None, - }; - - orchard_trees = Some(GetBlockTrees::new(sapling_tree_size, orchard_tree_size)); - } - } - } - - let tx = txids - .ok_or_else(|| StateServiceError::Custom("No txids found in block.".to_string()))? - .into_iter() - .map(|tx| { - tx.parse::() - .map(GetBlockTransaction::Hash) - }) - .collect::, _>>()?; - - let trees = orchard_trees - .ok_or_else(|| StateServiceError::Custom("No orchard trees found.".to_string()))?; - - Ok(GetBlock::Object { - hash, - confirmations, - height: Some(height), - version: Some(version), - merkle_root: Some(merkle_root), - time: Some(time), - nonce: Some(nonce), - solution: Some(solution), - bits: Some(bits), - difficulty: Some(difficulty), - tx, - trees, - size: None, - final_sapling_root: Some(final_sapling_root), - final_orchard_root, - previous_block_hash: Some(previous_block_hash), - next_block_hash, - }) - } else { - Err(StateServiceError::RpcError( - zaino_fetch::jsonrpc::connector::RpcError { - code: jsonrpc_core::ErrorCode::InvalidParams.code(), - message: "Invalid verbosity value".to_string(), - data: None, - }, - )) - } - } - /// Returns the requested block header by hash or height, as a [`GetBlockHeader`] JSON string. /// If the block is not in Zebra's state, /// returns [error code `-8`.](https://github.com/zcash/zcash/issues/5758) @@ -2006,7 +1691,7 @@ mod tests { let state_start = tokio::time::Instant::now(); let state_service_get_block = state_service - .get_block("1".to_string(), Some(0)) + .z_get_block("1".to_string(), Some(0)) .await .unwrap(); let state_service_duration = state_start.elapsed(); @@ -2049,7 +1734,7 @@ mod tests { let state_start = tokio::time::Instant::now(); let state_service_get_block = state_service - .get_block("1".to_string(), Some(1)) + .z_get_block("1".to_string(), Some(1)) .await .unwrap(); let state_service_duration = state_start.elapsed(); From 35cc3a85f34dff63668b91f5ddfac2ecc156feea Mon Sep 17 00:00:00 2001 From: Hazel OHearn Date: Thu, 6 Feb 2025 15:17:00 -0400 Subject: [PATCH 06/10] clippy refactor --- zaino-state/src/local_cache.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/zaino-state/src/local_cache.rs b/zaino-state/src/local_cache.rs index 75fd71b1..81a49603 100644 --- a/zaino-state/src/local_cache.rs +++ b/zaino-state/src/local_cache.rs @@ -54,10 +54,10 @@ impl BlockCache { /// Returns a [`BlockCacheSubscriber`]. pub fn subscriber(&self) -> BlockCacheSubscriber { - let finalised_state_subscriber = match &self.finalised_state { - Some(finalised_state) => Some(finalised_state.subscriber()), - None => None, - }; + let finalised_state_subscriber = self + .finalised_state + .as_ref() + .map(FinalisedState::subscriber); BlockCacheSubscriber { fetcher: self.fetcher.clone(), non_finalised_state: self.non_finalised_state.subscriber(), From a44958428327611f02baf5c3278a3257a87729e7 Mon Sep 17 00:00:00 2001 From: Hazel OHearn Date: Thu, 6 Feb 2025 18:15:51 -0400 Subject: [PATCH 07/10] update comment --- zaino-state/src/state.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/zaino-state/src/state.rs b/zaino-state/src/state.rs index e69af8a9..e9367643 100644 --- a/zaino-state/src/state.rs +++ b/zaino-state/src/state.rs @@ -762,11 +762,8 @@ impl Drop for StateService { } } -/// This impl will hold the Zcash RPC method implementations for StateService. -/// -/// Doc comments are taken from Zebra for consistency. -/// -/// TODO: Update this to be `impl ZcashIndexer for StateService` once rpc methods are implemented and tested (or implement separately). +/// As get_block_header is not a part of the public interface, +/// it has not been moved to the ZcashIndexer trait. impl StateService { /// Returns the requested block header by hash or height, as a [`GetBlockHeader`] JSON string. /// If the block is not in Zebra's state, From 2ac08b7db677bac0242196b9e081b08f8fab02f4 Mon Sep 17 00:00:00 2001 From: Hazel OHearn Date: Thu, 6 Feb 2025 18:21:28 -0400 Subject: [PATCH 08/10] move movement --- zaino-state/src/state.rs | 280 ++++++++++++++++++++------------------- 1 file changed, 141 insertions(+), 139 deletions(-) diff --git a/zaino-state/src/state.rs b/zaino-state/src/state.rs index e9367643..0b2c1ddd 100644 --- a/zaino-state/src/state.rs +++ b/zaino-state/src/state.rs @@ -762,8 +762,10 @@ impl Drop for StateService { } } -/// As get_block_header is not a part of the public interface, -/// it has not been moved to the ZcashIndexer trait. +/// Private RPC methods, which are used as helper methods by the public ones +/// +/// These would be simple to add to the public interface if +/// needed, there are currently no plans to do so. impl StateService { /// Returns the requested block header by hash or height, as a [`GetBlockHeader`] JSON string. /// If the block is not in Zebra's state, @@ -896,143 +898,6 @@ impl StateService { Ok(response) } -} - -/// This impl will hold the Lightwallet RPC method implementations for StateService. -/// -/// TODO: Update this to be `impl LightWalletIndexer for StateService` once rpc methods are implemented and tested (or implement separately). -impl StateService { - /// Return a list of consecutive compact blocks. - pub async fn get_block_range( - &self, - blockrange: BlockRange, - ) -> Result { - let mut start: u32 = match blockrange.start { - Some(block_id) => match block_id.height.try_into() { - Ok(height) => height, - Err(_) => { - return Err(StateServiceError::TonicStatusError( - tonic::Status::invalid_argument( - "Error: Start height out of range. Failed to convert to u32.", - ), - )); - } - }, - None => { - return Err(StateServiceError::TonicStatusError( - tonic::Status::invalid_argument("Error: No start height given."), - )); - } - }; - let mut end: u32 = match blockrange.end { - Some(block_id) => match block_id.height.try_into() { - Ok(height) => height, - Err(_) => { - return Err(StateServiceError::TonicStatusError( - tonic::Status::invalid_argument( - "Error: End height out of range. Failed to convert to u32.", - ), - )); - } - }, - None => { - return Err(StateServiceError::TonicStatusError( - tonic::Status::invalid_argument("Error: No start height given."), - )); - } - }; - let rev_order = if start > end { - (start, end) = (end, start); - true - } else { - false - }; - - let cloned_read_state_service = self.read_state_service.clone(); - let network = self.config.network.clone(); - let service_channel_size = self.config.service_channel_size; - let service_timeout = self.config.service_timeout; - let latest_chain_tip = self.latest_chain_tip.clone(); - let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(service_channel_size as usize); - tokio::spawn(async move { - let timeout = timeout( - std::time::Duration::from_secs(service_timeout as u64), - async { - for height in start..=end { - let height = if rev_order { - end - (height - start) - } else { - height - }; - match StateService::get_compact_block( - &cloned_read_state_service, - height.to_string(), - &network, - ).await { - Ok(block) => { - if channel_tx.send(Ok(block)).await.is_err() { - break; - }; - } - Err(e) => { - let chain_height = match latest_chain_tip.best_tip_height() { - Some(ch) => ch.0, - None => { - if let Err(e) = channel_tx - .send(Err(tonic::Status::unknown("No best tip height found"))) - .await - { - eprintln!("Error: channel closed unexpectedly: {e}"); - } - break; - } - }; - if height >= chain_height { - match channel_tx - .send(Err(tonic::Status::out_of_range(format!( - "Error: Height out of range [{}]. Height requested is greater than the best chain tip [{}].", - height, chain_height, - )))) - .await - - { - Ok(_) => break, - Err(e) => { - eprintln!("Error: Channel closed unexpectedly: {}", e); - break; - } - } - } else { - // TODO: Hide server error from clients before release. Currently useful for dev purposes. - if channel_tx - .send(Err(tonic::Status::unknown(e.to_string()))) - .await - .is_err() - { - break; - } - } - } - } - } - }, - ) - .await; - match timeout { - Ok(_) => {} - Err(_) => { - channel_tx - .send(Err(tonic::Status::deadline_exceeded( - "Error: get_block_range gRPC request timed out.", - ))) - .await - .ok(); - } - } - }); - - Ok(CompactBlockStream::new(channel_rx)) - } /// Returns a [`zaino_proto::proto::compact_formats::CompactTx`]. /// @@ -1227,6 +1092,143 @@ impl StateService { } } +/// This impl will hold the Lightwallet RPC method implementations for StateService. +/// +/// TODO: Update this to be `impl LightWalletIndexer for StateService` once rpc methods are implemented and tested (or implement separately). +impl StateService { + /// Return a list of consecutive compact blocks. + pub async fn get_block_range( + &self, + blockrange: BlockRange, + ) -> Result { + let mut start: u32 = match blockrange.start { + Some(block_id) => match block_id.height.try_into() { + Ok(height) => height, + Err(_) => { + return Err(StateServiceError::TonicStatusError( + tonic::Status::invalid_argument( + "Error: Start height out of range. Failed to convert to u32.", + ), + )); + } + }, + None => { + return Err(StateServiceError::TonicStatusError( + tonic::Status::invalid_argument("Error: No start height given."), + )); + } + }; + let mut end: u32 = match blockrange.end { + Some(block_id) => match block_id.height.try_into() { + Ok(height) => height, + Err(_) => { + return Err(StateServiceError::TonicStatusError( + tonic::Status::invalid_argument( + "Error: End height out of range. Failed to convert to u32.", + ), + )); + } + }, + None => { + return Err(StateServiceError::TonicStatusError( + tonic::Status::invalid_argument("Error: No start height given."), + )); + } + }; + let rev_order = if start > end { + (start, end) = (end, start); + true + } else { + false + }; + + let cloned_read_state_service = self.read_state_service.clone(); + let network = self.config.network.clone(); + let service_channel_size = self.config.service_channel_size; + let service_timeout = self.config.service_timeout; + let latest_chain_tip = self.latest_chain_tip.clone(); + let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(service_channel_size as usize); + tokio::spawn(async move { + let timeout = timeout( + std::time::Duration::from_secs(service_timeout as u64), + async { + for height in start..=end { + let height = if rev_order { + end - (height - start) + } else { + height + }; + match StateService::get_compact_block( + &cloned_read_state_service, + height.to_string(), + &network, + ).await { + Ok(block) => { + if channel_tx.send(Ok(block)).await.is_err() { + break; + }; + } + Err(e) => { + let chain_height = match latest_chain_tip.best_tip_height() { + Some(ch) => ch.0, + None => { + if let Err(e) = channel_tx + .send(Err(tonic::Status::unknown("No best tip height found"))) + .await + { + eprintln!("Error: channel closed unexpectedly: {e}"); + } + break; + } + }; + if height >= chain_height { + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{}]. Height requested is greater than the best chain tip [{}].", + height, chain_height, + )))) + .await + + { + Ok(_) => break, + Err(e) => { + eprintln!("Error: Channel closed unexpectedly: {}", e); + break; + } + } + } else { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + if channel_tx + .send(Err(tonic::Status::unknown(e.to_string()))) + .await + .is_err() + { + break; + } + } + } + } + } + }, + ) + .await; + match timeout { + Ok(_) => {} + Err(_) => { + channel_tx + .send(Err(tonic::Status::deadline_exceeded( + "Error: get_block_range gRPC request timed out.", + ))) + .await + .ok(); + } + } + }); + + Ok(CompactBlockStream::new(channel_rx)) + } +} + /// Converts a [`zebra_chain::transaction::Transaction`] into a [`zaino_proto::proto::compact_formats::CompactTx`]. /// /// Notes: From 0c72677d58b45bf6eb9ae800204d48042cf9420f Mon Sep 17 00:00:00 2001 From: Hazel OHearn Date: Wed, 12 Feb 2025 13:19:44 -0400 Subject: [PATCH 09/10] add back new from legacycode --- zaino-fetch/src/jsonrpc/connector.rs | 11 +++++++++++ zaino-state/src/state.rs | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/zaino-fetch/src/jsonrpc/connector.rs b/zaino-fetch/src/jsonrpc/connector.rs index 4b1884e5..99b175c0 100644 --- a/zaino-fetch/src/jsonrpc/connector.rs +++ b/zaino-fetch/src/jsonrpc/connector.rs @@ -57,6 +57,17 @@ pub struct RpcError { } impl RpcError { + /// Creates a new `RpcError` from zebra's `LegacyCode` enum + pub fn new_from_legacycode( + code: zebra_rpc::server::error::LegacyCode, + message: impl Into, + ) -> Self { + RpcError { + code: code as i64, + message: message.into(), + data: None, + } + } /// Creates a new `RpcError` from jsonrpsee-types `ErrorObject`. pub fn new_from_errorobject( error_obj: jsonrpsee_types::ErrorObject<'_>, diff --git a/zaino-state/src/state.rs b/zaino-state/src/state.rs index dc126d42..fecdbef7 100644 --- a/zaino-state/src/state.rs +++ b/zaino-state/src/state.rs @@ -20,7 +20,7 @@ use tokio::time::timeout; use tonic::async_trait; use tower::Service; -use zaino_fetch::jsonrpc::connector::{test_node_and_return_uri, JsonRpcConnector, RpcError}; +use zaino_fetch::jsonrpc::connector::{JsonRpcConnector, RpcError}; use zaino_fetch::jsonrpc::response::TxidsResponse; use zaino_proto::proto::compact_formats::{ ChainMetadata, CompactBlock, CompactOrchardAction, CompactSaplingOutput, CompactSaplingSpend, From 7b1cef48a0071c3b5efd46c77c7efa5863aa7cfe Mon Sep 17 00:00:00 2001 From: Hazel OHearn Date: Wed, 12 Feb 2025 13:27:14 -0400 Subject: [PATCH 10/10] fix tests after interface change --- zaino-state/src/state.rs | 83 +++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/zaino-state/src/state.rs b/zaino-state/src/state.rs index fecdbef7..b8830de8 100644 --- a/zaino-state/src/state.rs +++ b/zaino-state/src/state.rs @@ -1300,8 +1300,10 @@ fn tx_to_compact( /// !!! NOTE / TODO: This code should be retested before continued development, once zebra regtest is fully operational. #[cfg(test)] mod tests { + use super::*; use futures::stream::StreamExt; + use zaino_fetch::jsonrpc::connector::test_node_and_return_url; use zaino_proto::proto::service::BlockId; use zaino_testutils::{TestManager, ZEBRAD_CHAIN_CACHE_BIN, ZEBRAD_TESTNET_CACHE_BIN}; use zebra_chain::parameters::Network; @@ -1332,10 +1334,9 @@ mod tests { debug_stop_at_height: None, debug_validity_check_interval: None, }, - SocketAddr::new( - std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), - test_manager.zebrad_rpc_listen_port, - ), + test_manager.zebrad_rpc_listen_address, + false, + None, None, None, None, @@ -1661,17 +1662,18 @@ mod tests { async fn state_service_regtest_get_block_raw() { let (mut test_manager, state_service) = create_test_manager_and_state_service(false, true, true, false).await; - let fetch_service = zaino_fetch::jsonrpc::connector::JsonRpcConnector::new( - url::Url::parse(&format!( - "http://127.0.0.1:{}", - test_manager.zebrad_rpc_listen_port - )) - .expect("Failed to construct URL") - .as_str() - .try_into() - .expect("Failed to convert URL to URI"), - Some("xxxxxx".to_string()), - Some("xxxxxx".to_string()), + let fetch_service = zaino_fetch::jsonrpc::connector::JsonRpcConnector::new_with_basic_auth( + test_node_and_return_url( + test_manager.zebrad_rpc_listen_address, + false, + None, + Some("xxxxxx".to_string()), + Some("xxxxxx".to_string()), + ) + .await + .unwrap(), + "xxxxxx".to_string(), + "xxxxxx".to_string(), ) .unwrap(); @@ -1683,7 +1685,7 @@ mod tests { let state_service_duration = state_start.elapsed(); let fetch_start = tokio::time::Instant::now(); - let fetch_service_get_block = json_service + let fetch_service_get_block = fetch_service .get_block("1".to_string(), Some(0)) .await .unwrap(); @@ -1703,17 +1705,18 @@ mod tests { async fn state_service_regtest_get_block_object() { let (mut test_manager, state_service) = create_test_manager_and_state_service(false, true, true, false).await; - let fetch_service = zaino_fetch::jsonrpc::connector::JsonRpcConnector::new( - url::Url::parse(&format!( - "http://127.0.0.1:{}", - test_manager.zebrad_rpc_listen_port - )) - .expect("Failed to construct URL") - .as_str() - .try_into() - .expect("Failed to convert URL to URI"), - Some("xxxxxx".to_string()), - Some("xxxxxx".to_string()), + let fetch_service = zaino_fetch::jsonrpc::connector::JsonRpcConnector::new_with_basic_auth( + test_node_and_return_url( + test_manager.zebrad_rpc_listen_address, + false, + None, + Some("xxxxxx".to_string()), + Some("xxxxxx".to_string()), + ) + .await + .unwrap(), + "xxxxxx".to_string(), + "xxxxxx".to_string(), ) .unwrap(); @@ -1725,7 +1728,7 @@ mod tests { let state_service_duration = state_start.elapsed(); let fetch_start = tokio::time::Instant::now(); - let fetch_service_get_block = json_service + let fetch_service_get_block = fetch_service .get_block("1".to_string(), Some(1)) .await .unwrap(); @@ -1881,19 +1884,19 @@ mod tests { let (mut test_manager, state_service) = create_test_manager_and_state_service(false, true, true, false).await; - let fetch_service = zaino_fetch::jsonrpc::connector::JsonRpcConnector::new( - url::Url::parse(&format!( - "http://127.0.0.1:{}", - test_manager.zebrad_rpc_listen_port - )) - .expect("Failed to construct URL") - .as_str() - .try_into() - .expect("Failed to convert URL to URI"), - Some("xxxxxx".to_string()), - Some("xxxxxx".to_string()), + let fetch_service = zaino_fetch::jsonrpc::connector::JsonRpcConnector::new_with_basic_auth( + test_node_and_return_url( + test_manager.zebrad_rpc_listen_address, + false, + None, + Some("xxxxxx".to_string()), + Some("xxxxxx".to_string()), + ) + .await + .unwrap(), + "xxxxxx".to_string(), + "xxxxxx".to_string(), ) - .await .unwrap(); let state_service_bcinfo = state_service.get_blockchain_info().await.unwrap(); let fetch_service_bcinfo = fetch_service.get_blockchain_info().await.unwrap();