diff --git a/grafana/transaction_verification.json b/grafana/transaction_verification.json new file mode 100644 index 00000000000..0e96e030d5c --- /dev/null +++ b/grafana/transaction_verification.json @@ -0,0 +1,207 @@ +{ + "__inputs": [ + { + "name": "DS_PROMETHEUS-ZEBRA", + "label": "Prometheus-Zebra", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "8.1.2" + }, + { + "type": "panel", + "id": "graph", + "name": "Graph (old)", + "version": "" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "target": { + "limit": 100, + "matchAny": false, + "tags": [], + "type": "dashboard" + }, + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "id": null, + "iteration": 1630092146360, + "links": [], + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 0 + }, + "hiddenSeries": false, + "id": 6, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "8.1.2", + "pointradius": 2, + "points": false, + "renderer": "flot", + "repeatDirection": "h", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "rate(gossip_downloaded_transaction_count{job=\"$job\"}[1m]) * 60", + "interval": "", + "legendFormat": "gossip_downloaded_transaction_count per min", + "refId": "C" + }, + { + "exemplar": true, + "expr": "rate(gossip_verified_transaction_count{job=\"$job\"}[1m]) * 60", + "interval": "", + "legendFormat": "gossip_verified_transaction_count per min", + "refId": "D" + }, + { + "exemplar": true, + "expr": "gossip_queued_transaction_count{job=\"$job\"}", + "interval": "", + "legendFormat": "gossip_queued_transaction_count", + "refId": "E" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Transaction Verifier Gossip Count - $job", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "refresh": "5s", + "schemaVersion": 30, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "allValue": null, + "current": {}, + "datasource": "${DS_PROMETHEUS-ZEBRA}", + "definition": "label_values(zcash_chain_verified_block_height, job)", + "description": null, + "error": null, + "hide": 0, + "includeAll": true, + "label": null, + "multi": true, + "name": "job", + "options": [], + "query": { + "query": "label_values(zcash_chain_verified_block_height, job)", + "refId": "StandardVariableQuery" + }, + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "tagValuesQuery": "", + "tagsQuery": "", + "type": "query", + "useTags": false + } + ] + }, + "time": { + "from": "now-30m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "transaction verification", + "uid": "oBEHvS4nz", + "version": 2 +} \ No newline at end of file diff --git a/zebra-chain/src/transaction.rs b/zebra-chain/src/transaction.rs index a867d5390e9..544f969dd66 100644 --- a/zebra-chain/src/transaction.rs +++ b/zebra-chain/src/transaction.rs @@ -376,8 +376,9 @@ impl Transaction { } } - /// Returns `true` if this transaction is a coinbase transaction. - pub fn is_coinbase(&self) -> bool { + /// Returns `true` if this transaction has valid inputs for a coinbase + /// transaction, that is, has a single input and it is a coinbase input. + pub fn has_valid_coinbase_transaction_inputs(&self) -> bool { self.inputs().len() == 1 && matches!( self.inputs().get(0), @@ -386,7 +387,7 @@ impl Transaction { } /// Returns `true` if transaction contains any coinbase inputs. - pub fn contains_coinbase_input(&self) -> bool { + pub fn has_any_coinbase_inputs(&self) -> bool { self.inputs() .iter() .any(|input| matches!(input, transparent::Input::Coinbase { .. })) diff --git a/zebra-chain/src/transaction/arbitrary.rs b/zebra-chain/src/transaction/arbitrary.rs index c9cf3686cba..2a3c1144616 100644 --- a/zebra-chain/src/transaction/arbitrary.rs +++ b/zebra-chain/src/transaction/arbitrary.rs @@ -428,7 +428,7 @@ impl Transaction { &mut self, outputs: &HashMap, ) -> Result, ValueBalanceError> { - if self.is_coinbase() { + if self.has_valid_coinbase_transaction_inputs() { // TODO: if needed, fixup coinbase: // - miner subsidy // - founders reward or funding streams (hopefully not?) diff --git a/zebra-chain/src/transaction/unmined.rs b/zebra-chain/src/transaction/unmined.rs index afddc5e5403..22076bf6526 100644 --- a/zebra-chain/src/transaction/unmined.rs +++ b/zebra-chain/src/transaction/unmined.rs @@ -12,7 +12,7 @@ //! unmined transactions. They can be used to handle transactions regardless of version, //! and get the [`WtxId`] or [`Hash`] when required. -use std::sync::Arc; +use std::{fmt, sync::Arc}; #[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; @@ -93,6 +93,15 @@ impl From<&WtxId> for UnminedTxId { } } +impl fmt::Display for UnminedTxId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Legacy(hash) => hash.fmt(f), + Witnessed(id) => id.fmt(f), + } + } +} + impl UnminedTxId { /// Create a new `UnminedTxId` using a v1-v4 legacy transaction ID. /// diff --git a/zebra-chain/src/transparent/utxo.rs b/zebra-chain/src/transparent/utxo.rs index 99c5586b3fb..9362266df67 100644 --- a/zebra-chain/src/transparent/utxo.rs +++ b/zebra-chain/src/transparent/utxo.rs @@ -157,7 +157,7 @@ pub(crate) fn new_transaction_ordered_outputs( ) -> HashMap { let mut new_ordered_outputs = HashMap::new(); - let from_coinbase = transaction.is_coinbase(); + let from_coinbase = transaction.has_valid_coinbase_transaction_inputs(); for (output_index_in_transaction, output) in transaction.outputs().iter().cloned().enumerate() { let output_index_in_transaction = output_index_in_transaction .try_into() diff --git a/zebra-consensus/src/block/check.rs b/zebra-consensus/src/block/check.rs index e7620067b19..71d4de2e03e 100644 --- a/zebra-consensus/src/block/check.rs +++ b/zebra-consensus/src/block/check.rs @@ -27,10 +27,10 @@ pub fn coinbase_is_first(block: &Block) -> Result<(), BlockError> { .get(0) .ok_or(BlockError::NoTransactions)?; let mut rest = block.transactions.iter().skip(1); - if !first.is_coinbase() { + if !first.has_valid_coinbase_transaction_inputs() { return Err(TransactionError::CoinbasePosition)?; } - if rest.any(|tx| tx.contains_coinbase_input()) { + if rest.any(|tx| tx.has_any_coinbase_inputs()) { return Err(TransactionError::CoinbaseAfterFirst)?; } diff --git a/zebra-consensus/src/error.rs b/zebra-consensus/src/error.rs index 50a1ad7523a..499b679040f 100644 --- a/zebra-consensus/src/error.rs +++ b/zebra-consensus/src/error.rs @@ -41,6 +41,9 @@ pub enum TransactionError { #[error("coinbase transaction MUST NOT have the EnableSpendsOrchard flag set")] CoinbaseHasEnableSpendsOrchard, + #[error("coinbase inputs MUST NOT exist in mempool")] + CoinbaseInMempool, + #[error("coinbase transaction failed subsidy validation")] Subsidy(#[from] SubsidyError), diff --git a/zebra-consensus/src/lib.rs b/zebra-consensus/src/lib.rs index cf32a3c52db..ca55b0bd932 100644 --- a/zebra-consensus/src/lib.rs +++ b/zebra-consensus/src/lib.rs @@ -47,7 +47,7 @@ mod config; mod parameters; mod primitives; mod script; -mod transaction; +pub mod transaction; pub mod chain; #[allow(missing_docs)] diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index 2126218e648..c1456189cee 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -1,3 +1,5 @@ +//! Asynchronous verification of transactions. +//! use std::{ collections::HashMap, future::Future, @@ -50,6 +52,7 @@ where ZS: Service + Send + Clone + 'static, ZS::Future: Send + 'static, { + /// Create a new transaction verifier. pub fn new(network: Network, script_verifier: script::Verifier) -> Self { Self { network, @@ -165,11 +168,6 @@ where // TODO: break up each chunk into its own method fn call(&mut self, req: Request) -> Self::Future { - if req.is_mempool() { - // XXX determine exactly which rules apply to mempool transactions - unimplemented!("Zebra does not yet have a mempool (#2309)"); - } - let script_verifier = self.script_verifier.clone(); let network = self.network; @@ -183,7 +181,10 @@ where // Do basic checks first check::has_inputs_and_outputs(&tx)?; - if tx.is_coinbase() { + if req.is_mempool() && tx.has_any_coinbase_inputs() { + return Err(TransactionError::CoinbaseInMempool); + } + if tx.has_valid_coinbase_transaction_inputs() { check::coinbase_tx_no_prevout_joinsplit_spend(&tx)?; } @@ -278,6 +279,9 @@ where ) -> Result { let tx = request.transaction(); let upgrade = request.upgrade(network); + + Self::verify_v4_transaction_network_upgrade(&tx, upgrade)?; + let shielded_sighash = tx.sighash(upgrade, HashType::ALL, None); Ok( @@ -298,6 +302,36 @@ where ) } + /// Verifies if a V4 `transaction` is supported by `network_upgrade`. + fn verify_v4_transaction_network_upgrade( + transaction: &Transaction, + network_upgrade: NetworkUpgrade, + ) -> Result<(), TransactionError> { + match network_upgrade { + // Supports V4 transactions + // + // Consensus rules: + // > [Sapling to Canopy inclusive, pre-NU5] The transaction version number MUST be 4, ... + // > + // > [NU5 onward] The transaction version number MUST be 4 or 5. + // + // https://zips.z.cash/protocol/protocol.pdf#txnconsensus + NetworkUpgrade::Sapling + | NetworkUpgrade::Blossom + | NetworkUpgrade::Heartwood + | NetworkUpgrade::Canopy + | NetworkUpgrade::Nu5 => Ok(()), + + // Does not support V4 transactions + NetworkUpgrade::Genesis + | NetworkUpgrade::BeforeOverwinter + | NetworkUpgrade::Overwinter => Err(TransactionError::UnsupportedByNetworkUpgrade( + transaction.version(), + network_upgrade, + )), + } + } + /// Verify a V5 transaction. /// /// Returns a set of asynchronous checks that must all succeed for the transaction to be @@ -327,10 +361,11 @@ where ) -> Result { let transaction = request.transaction(); let upgrade = request.upgrade(network); - let shielded_sighash = transaction.sighash(upgrade, HashType::ALL, None); Self::verify_v5_transaction_network_upgrade(&transaction, upgrade)?; + let shielded_sighash = transaction.sighash(upgrade, HashType::ALL, None); + let _async_checks = Self::verify_transparent_inputs_and_outputs( &request, network, @@ -363,6 +398,11 @@ where ) -> Result<(), TransactionError> { match network_upgrade { // Supports V5 transactions + // + // Consensus rules: + // > [NU5 onward] The transaction version number MUST be 4 or 5. + // + // https://zips.z.cash/protocol/protocol.pdf#txnconsensus NetworkUpgrade::Nu5 => Ok(()), // Does not support V5 transactions @@ -389,7 +429,7 @@ where ) -> Result { let transaction = request.transaction(); - if transaction.is_coinbase() { + if transaction.has_valid_coinbase_transaction_inputs() { // The script verifier only verifies PrevOut inputs and their corresponding UTXOs. // Coinbase transactions don't have any PrevOut inputs. Ok(AsyncChecks::new()) diff --git a/zebra-consensus/src/transaction/check.rs b/zebra-consensus/src/transaction/check.rs index 7dfb224111d..3dd667f4a20 100644 --- a/zebra-consensus/src/transaction/check.rs +++ b/zebra-consensus/src/transaction/check.rs @@ -51,7 +51,7 @@ pub fn has_inputs_and_outputs(tx: &Transaction) -> Result<(), TransactionError> /// /// https://zips.z.cash/protocol/protocol.pdf#txnencodingandconsensus pub fn coinbase_tx_no_prevout_joinsplit_spend(tx: &Transaction) -> Result<(), TransactionError> { - if tx.is_coinbase() { + if tx.has_valid_coinbase_transaction_inputs() { if tx.contains_prevout_input() { return Err(TransactionError::CoinbaseHasPrevOutInput); } else if tx.joinsplit_count() > 0 { diff --git a/zebra-consensus/src/transaction/tests.rs b/zebra-consensus/src/transaction/tests.rs index 800b06db8d1..2efb92b377d 100644 --- a/zebra-consensus/src/transaction/tests.rs +++ b/zebra-consensus/src/transaction/tests.rs @@ -165,7 +165,7 @@ fn v5_coinbase_transaction_without_enable_spends_flag_passes_validation() { zebra_test::vectors::MAINNET_BLOCKS.iter(), ) .rev() - .find(|transaction| transaction.is_coinbase()) + .find(|transaction| transaction.has_valid_coinbase_transaction_inputs()) .expect("At least one fake V5 coinbase transaction in the test vectors"); insert_fake_orchard_shielded_data(&mut transaction); @@ -180,7 +180,7 @@ fn v5_coinbase_transaction_with_enable_spends_flag_fails_validation() { zebra_test::vectors::MAINNET_BLOCKS.iter(), ) .rev() - .find(|transaction| transaction.is_coinbase()) + .find(|transaction| transaction.has_valid_coinbase_transaction_inputs()) .expect("At least one fake V5 coinbase transaction in the test vectors"); let shielded_data = insert_fake_orchard_shielded_data(&mut transaction); @@ -702,7 +702,8 @@ fn v4_with_sapling_spends() { let (height, transaction) = test_transactions(network) .rev() .filter(|(_, transaction)| { - !transaction.is_coinbase() && transaction.inputs().is_empty() + !transaction.has_valid_coinbase_transaction_inputs() + && transaction.inputs().is_empty() }) .find(|(_, transaction)| transaction.sapling_spends_per_anchor().next().is_some()) .expect("No transaction found with Sapling spends"); @@ -739,7 +740,8 @@ fn v4_with_sapling_outputs_and_no_spends() { let (height, transaction) = test_transactions(network) .rev() .filter(|(_, transaction)| { - !transaction.is_coinbase() && transaction.inputs().is_empty() + !transaction.has_valid_coinbase_transaction_inputs() + && transaction.inputs().is_empty() }) .find(|(_, transaction)| { transaction.sapling_spends_per_anchor().next().is_none() @@ -781,7 +783,10 @@ fn v5_with_sapling_spends() { let transaction = fake_v5_transactions_for_network(network, zebra_test::vectors::MAINNET_BLOCKS.iter()) .rev() - .filter(|transaction| !transaction.is_coinbase() && transaction.inputs().is_empty()) + .filter(|transaction| { + !transaction.has_valid_coinbase_transaction_inputs() + && transaction.inputs().is_empty() + }) .find(|transaction| transaction.sapling_spends_per_anchor().next().is_some()) .expect("No transaction found with Sapling spends"); diff --git a/zebra-state/src/service/check/utxo.rs b/zebra-state/src/service/check/utxo.rs index d402b551c15..927aa4b7036 100644 --- a/zebra-state/src/service/check/utxo.rs +++ b/zebra-state/src/service/check/utxo.rs @@ -227,7 +227,7 @@ pub fn remaining_transaction_value( ) -> Result<(), ValidateContextError> { for (tx_index_in_block, transaction) in prepared.block.transactions.iter().enumerate() { // TODO: check coinbase transaction remaining value (#338, #1162) - if transaction.is_coinbase() { + if transaction.has_valid_coinbase_transaction_inputs() { continue; } diff --git a/zebra-state/src/service/tests.rs b/zebra-state/src/service/tests.rs index 2f041d0dfb6..814d65e3c50 100644 --- a/zebra-state/src/service/tests.rs +++ b/zebra-state/src/service/tests.rs @@ -91,7 +91,7 @@ async fn test_populated_state_responds_correctly( Ok(Response::Transaction(Some(transaction.clone()))), )); - let from_coinbase = transaction.is_coinbase(); + let from_coinbase = transaction.has_valid_coinbase_transaction_inputs(); for (index, output) in transaction.outputs().iter().cloned().enumerate() { let outpoint = transparent::OutPoint { hash: transaction_hash, diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 8212d12710b..54ada46b02d 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -57,7 +57,7 @@ impl StartCmd { info!("initializing verifiers"); // TODO: use the transaction verifier to verify mempool transactions (#2637, #2606) - let (chain_verifier, _tx_verifier) = zebra_consensus::chain::init( + let (chain_verifier, tx_verifier) = zebra_consensus::chain::init( config.consensus.clone(), config.network.network, state.clone(), @@ -76,6 +76,7 @@ impl StartCmd { setup_rx, state.clone(), chain_verifier.clone(), + tx_verifier.clone(), )); let (peer_set, address_book) = diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index b6372efd68a..271b5126b6a 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -17,19 +17,28 @@ use zebra_network as zn; use zebra_state as zs; use zebra_chain::block::{self, Block}; -use zebra_consensus::chain::VerifyChainError; +use zebra_consensus::transaction; +use zebra_consensus::{chain::VerifyChainError, error::TransactionError}; use zebra_network::AddressBook; +use super::mempool::downloads::{ + Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT, +}; // Re-use the syncer timeouts for consistency. use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; mod downloads; -use downloads::Downloads; +use downloads::Downloads as BlockDownloads; type Outbound = Buffer, zn::Request>; type State = Buffer, zs::Request>; -type Verifier = Buffer, block::Hash, VerifyChainError>, Arc>; -type InboundDownloads = Downloads, Timeout, State>; +type BlockVerifier = Buffer, block::Hash, VerifyChainError>, Arc>; +type TxVerifier = Buffer< + BoxService, + transaction::Request, +>; +type InboundBlockDownloads = BlockDownloads, Timeout, State>; +type InboundTxDownloads = TxDownloads, Timeout, State>; pub type NetworkSetupData = (Outbound, Arc>); @@ -44,9 +53,13 @@ pub enum Setup { /// after the network is set up. network_setup: oneshot::Receiver, - /// A service that verifies downloaded blocks. Given to `downloads` + /// A service that verifies downloaded blocks. Given to `block_downloads` + /// after the network is set up. + block_verifier: BlockVerifier, + + /// A service that verifies downloaded transactions. Given to `tx_downloads` /// after the network is set up. - verifier: Verifier, + tx_verifier: TxVerifier, }, /// Network setup is complete. @@ -57,7 +70,9 @@ pub enum Setup { address_book: Arc>, /// A `futures::Stream` that downloads and verifies gossiped blocks. - downloads: Pin>, + block_downloads: Pin>, + + tx_downloads: Pin>, }, /// Temporary state used in the service's internal network initialization @@ -117,12 +132,14 @@ impl Inbound { pub fn new( network_setup: oneshot::Receiver, state: State, - verifier: Verifier, + block_verifier: BlockVerifier, + tx_verifier: TxVerifier, ) -> Self { Self { network_setup: Setup::AwaitingNetwork { network_setup, - verifier, + block_verifier, + tx_verifier, }, state, } @@ -154,18 +171,25 @@ impl Service for Inbound { self.network_setup = match self.take_setup() { Setup::AwaitingNetwork { mut network_setup, - verifier, + block_verifier, + tx_verifier, } => match network_setup.try_recv() { Ok((outbound, address_book)) => { - let downloads = Box::pin(Downloads::new( - Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT), - Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT), + let block_downloads = Box::pin(BlockDownloads::new( + Timeout::new(outbound.clone(), BLOCK_DOWNLOAD_TIMEOUT), + Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT), + self.state.clone(), + )); + let tx_downloads = Box::pin(TxDownloads::new( + Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT), + Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT), self.state.clone(), )); result = Ok(()); Setup::Initialized { address_book, - downloads, + block_downloads, + tx_downloads, } } Err(TryRecvError::Empty) => { @@ -173,7 +197,8 @@ impl Service for Inbound { result = Ok(()); Setup::AwaitingNetwork { network_setup, - verifier, + block_verifier, + tx_verifier, } } Err(error @ TryRecvError::Closed) => { @@ -194,14 +219,17 @@ impl Service for Inbound { // Clean up completed download tasks, ignoring their results Setup::Initialized { address_book, - mut downloads, + mut block_downloads, + mut tx_downloads, } => { - while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {} + while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {} + while let Poll::Ready(Some(_)) = tx_downloads.as_mut().poll_next(cx) {} result = Ok(()); Setup::Initialized { address_book, - downloads, + block_downloads, + tx_downloads, } } }; @@ -312,15 +340,29 @@ impl Service for Inbound { } zn::Request::PushTransaction(_transaction) => { debug!("ignoring unimplemented request"); + // TODO: send to Tx Download & Verify Stream async { Ok(zn::Response::Nil) }.boxed() } - zn::Request::AdvertiseTransactionIds(_transactions) => { - debug!("ignoring unimplemented request"); + zn::Request::AdvertiseTransactionIds(transactions) => { + if let Setup::Initialized { tx_downloads, .. } = &mut self.network_setup { + // TODO: check if we're close to the tip before proceeding? + // what do we do if it's not? + for txid in transactions { + tx_downloads.download_and_verify(txid); + } + } else { + info!( + "ignoring `AdvertiseTransactionIds` request from remote peer during network setup" + ); + } async { Ok(zn::Response::Nil) }.boxed() } zn::Request::AdvertiseBlock(hash) => { - if let Setup::Initialized { downloads, .. } = &mut self.network_setup { - downloads.download_and_verify(hash); + if let Setup::Initialized { + block_downloads, .. + } = &mut self.network_setup + { + block_downloads.download_and_verify(hash); } else { info!( ?hash, diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index 320993e85b5..f832f382a98 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -32,10 +32,8 @@ type BoxError = Box; /// attacks. /// /// The maximum block size is 2 million bytes. A deserialized malicious -/// block with ~225_000 transparent outputs can take up 9MB of RAM. As of -/// February 2021, a growing `Vec` can allocate up to 2x its current length, -/// leading to an overall memory usage of 18MB per malicious block. (See -/// #1880 for more details.) +/// block with ~225_000 transparent outputs can take up 9MB of RAM. +/// (See #1880 for more details.) /// /// Malicious blocks will eventually timeout or fail contextual validation. /// Once validation fails, the block is dropped, and its memory is deallocated. @@ -116,8 +114,7 @@ where // If no download and verify tasks have exited since the last poll, this // task is scheduled for wakeup when the next task becomes ready. // - // TODO: - // This would be cleaner with poll_map #63514, but that's nightly only. + // TODO: this would be cleaner with poll_map (#2693) if let Some(join_result) = ready!(this.pending.poll_next(cx)) { match join_result.expect("block download and verify tasks must not panic") { Ok(hash) => { @@ -245,7 +242,6 @@ where }); self.pending.push(task); - // XXX replace with expect_none when stable assert!( self.cancel_handles.insert(hash, cancel_tx).is_none(), "blocks are only queued once" diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 7d419d0c7e6..278fcb802be 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -18,6 +18,7 @@ use zebra_chain::{ use crate::BoxError; mod crawler; +pub mod downloads; mod error; mod storage; diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs new file mode 100644 index 00000000000..56b51bd2daa --- /dev/null +++ b/zebrad/src/components/mempool/downloads.rs @@ -0,0 +1,301 @@ +use std::{ + collections::HashMap, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use color_eyre::eyre::eyre; +use futures::{ + future::TryFutureExt, + ready, + stream::{FuturesUnordered, Stream}, +}; +use pin_project::pin_project; +use tokio::{sync::oneshot, task::JoinHandle}; +use tower::{Service, ServiceExt}; +use tracing_futures::Instrument; + +use zebra_chain::transaction::UnminedTxId; +use zebra_consensus::transaction as tx; +use zebra_network as zn; +use zebra_state as zs; + +use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; + +type BoxError = Box; + +/// Controls how long we wait for a transaction download request to complete. +/// +/// This is currently equal to [`crate::components::sync::BLOCK_DOWNLOAD_TIMEOUT`] for +/// consistency, even though parts of the rationale used for defining the value +/// don't apply here (e.g. we can drop transactions hashes when the queue is full). +pub(crate) const TRANSACTION_DOWNLOAD_TIMEOUT: Duration = BLOCK_DOWNLOAD_TIMEOUT; + +/// Controls how long we wait for a transaction verify request to complete. +/// +/// This is currently equal to [`crate::components::sync::BLOCK_VERIFY_TIMEOUT`] for +/// consistency. +/// +/// This timeout may lead to denial of service, which will be handled in +/// https://github.com/ZcashFoundation/zebra/issues/2694 +pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT; + +/// The maximum number of concurrent inbound download and verify tasks. +/// +/// We expect the mempool crawler to download and verify most mempool transactions, so this bound +/// can be small. +/// +/// ## Security +/// +/// We use a small concurrency limit, to prevent memory denial-of-service +/// attacks. +/// +/// The maximum transaction size is 2 million bytes. A deserialized malicious +/// transaction with ~225_000 transparent outputs can take up 9MB of RAM. +/// (See #1880 for more details.) +/// +/// Malicious transactions will eventually timeout or fail validation. +/// Once validation fails, the transaction is dropped, and its memory is deallocated. +/// +/// Since Zebra keeps an `inv` index, inbound downloads for malicious transactions +/// will be directed to the malicious node that originally gossiped the hash. +/// Therefore, this attack can be carried out by a single malicious node. +const MAX_INBOUND_CONCURRENCY: usize = 10; + +/// The action taken in response to a peer's gossiped transaction hash. +pub enum DownloadAction { + /// The transaction hash was successfully queued for download and verification. + AddedToQueue, + + /// The transaction hash is already queued, so this request was ignored. + /// + /// Another peer has already gossiped the same hash to us, or the mempool crawler has fetched it. + AlreadyQueued, + + /// The queue is at capacity, so this request was ignored. + /// + /// The mempool crawler should discover this transaction later. + /// If it is mined into a block, it will be downloaded by the syncer, or the inbound block downloader. + /// + /// The queue's capacity is [`MAX_INBOUND_CONCURRENCY`]. + FullQueue, +} + +/// Represents a [`Stream`] of download and verification tasks. +#[pin_project] +#[derive(Debug)] +pub struct Downloads +where + ZN: Service + Send + 'static, + ZN::Future: Send, + ZV: Service + Send + Clone + 'static, + ZV::Future: Send, + ZS: Service + Send + Clone + 'static, + ZS::Future: Send, +{ + // Services + /// A service that forwards requests to connected peers, and returns their + /// responses. + network: ZN, + + /// A service that verifies downloaded transactions. + verifier: ZV, + + /// A service that manages cached blockchain state. + state: ZS, + + // Internal downloads state + /// A list of pending transaction download and verify tasks. + #[pin] + pending: FuturesUnordered>>, + + /// A list of channels that can be used to cancel pending transaction download and + /// verify tasks. + cancel_handles: HashMap>, +} + +impl Stream for Downloads +where + ZN: Service + Send + Clone + 'static, + ZN::Future: Send, + ZV: Service + Send + Clone + 'static, + ZV::Future: Send, + ZS: Service + Send + Clone + 'static, + ZS::Future: Send, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + // CORRECTNESS + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + // If no download and verify tasks have exited since the last poll, this + // task is scheduled for wakeup when the next task becomes ready. + // + // TODO: this would be cleaner with poll_map (#2693) + if let Some(join_result) = ready!(this.pending.poll_next(cx)) { + match join_result.expect("transaction download and verify tasks must not panic") { + Ok(hash) => { + this.cancel_handles.remove(&hash); + Poll::Ready(Some(Ok(hash))) + } + Err((e, hash)) => { + this.cancel_handles.remove(&hash); + Poll::Ready(Some(Err(e))) + } + } + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option) { + self.pending.size_hint() + } +} + +impl Downloads +where + ZN: Service + Send + Clone + 'static, + ZN::Future: Send, + ZV: Service + Send + Clone + 'static, + ZV::Future: Send, + ZS: Service + Send + Clone + 'static, + ZS::Future: Send, +{ + /// Initialize a new download stream with the provided `network` and + /// `verifier` services. + /// + /// The [`Downloads`] stream is agnostic to the network policy, so retry and + /// timeout limits should be applied to the `network` service passed into + /// this constructor. + pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self { + Self { + network, + verifier, + state, + pending: FuturesUnordered::new(), + cancel_handles: HashMap::new(), + } + } + + /// Queue a transaction for download and verification. + /// + /// Returns the action taken in response to the queue request. + #[instrument(skip(self, txid), fields(txid = %txid))] + pub fn download_and_verify(&mut self, txid: UnminedTxId) -> DownloadAction { + if self.cancel_handles.contains_key(&txid) { + tracing::debug!( + ?txid, + queue_len = self.pending.len(), + ?MAX_INBOUND_CONCURRENCY, + "transaction id already queued for inbound download: ignored transaction" + ); + return DownloadAction::AlreadyQueued; + } + + if self.pending.len() >= MAX_INBOUND_CONCURRENCY { + tracing::info!( + ?txid, + queue_len = self.pending.len(), + ?MAX_INBOUND_CONCURRENCY, + "too many transactions queued for inbound download: ignored transaction" + ); + return DownloadAction::FullQueue; + } + + // This oneshot is used to signal cancellation to the download task. + let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>(); + + let network = self.network.clone(); + let verifier = self.verifier.clone(); + let state = self.state.clone(); + + let fut = async move { + // TODO: adapt this for transaction / mempool + // // Check if the block is already in the state. + // // BUG: check if the hash is in any chain (#862). + // // Depth only checks the main chain. + // match state.oneshot(zs::Request::Depth(hash)).await { + // Ok(zs::Response::Depth(None)) => Ok(()), + // Ok(zs::Response::Depth(Some(_))) => Err("already present".into()), + // Ok(_) => unreachable!("wrong response"), + // Err(e) => Err(e), + // }?; + + let height = match state.oneshot(zs::Request::Tip).await { + Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()), + Ok(zs::Response::Tip(Some((height, _hash)))) => Ok(height), + Ok(_) => unreachable!("wrong response"), + Err(e) => Err(e), + }?; + let height = (height + 1).ok_or_else(|| eyre!("no next height"))?; + + let tx = if let zn::Response::Transactions(txs) = network + .oneshot(zn::Request::TransactionsById( + std::iter::once(txid).collect(), + )) + .await? + { + txs.into_iter() + .next() + .expect("successful response has the transaction in it") + } else { + unreachable!("wrong response to transaction request"); + }; + metrics::counter!("gossip.downloaded.transaction.count", 1); + + let result = verifier + .oneshot(tx::Request::Mempool { + transaction: tx, + height, + }) + .await; + + tracing::debug!(?txid, ?result, "verified transaction for the mempool"); + + result + } + .map_ok(|hash| { + metrics::counter!("gossip.verified.transaction.count", 1); + hash + }) + // Tack the hash onto the error so we can remove the cancel handle + // on failure as well as on success. + .map_err(move |e| (e, txid)) + .in_current_span(); + + let task = tokio::spawn(async move { + // TODO: if the verifier and cancel are both ready, which should we + // prefer? (Currently, select! chooses one at random.) + tokio::select! { + _ = &mut cancel_rx => { + tracing::trace!("task cancelled prior to completion"); + metrics::counter!("gossip.cancelled.count", 1); + Err(("canceled".into(), txid)) + } + verification = fut => verification, + } + }); + + self.pending.push(task); + assert!( + self.cancel_handles.insert(txid, cancel_tx).is_none(), + "transactions are only queued once" + ); + + tracing::debug!( + ?txid, + queue_len = self.pending.len(), + ?MAX_INBOUND_CONCURRENCY, + "queued transaction hash for download" + ); + metrics::gauge!("gossip.queued.transaction.count", self.pending.len() as _); + + DownloadAction::AddedToQueue + } +} diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 11f7ab6ddbb..ce5089c2d32 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -80,8 +80,7 @@ where // If no download and verify tasks have exited since the last poll, this // task is scheduled for wakeup when the next task becomes ready. // - // TODO: - // This would be cleaner with poll_map #63514, but that's nightly only. + // TODO: this would be cleaner with poll_map (#2693) if let Some(join_result) = ready!(this.pending.poll_next(cx)) { match join_result.expect("block download and verify tasks must not panic") { Ok(hash) => { @@ -203,7 +202,6 @@ where ); self.pending.push(task); - // XXX replace with expect_none when stable assert!( self.cancel_handles.insert(hash, cancel_tx).is_none(), "blocks are only queued once"