From 1eb5720049528dfa70a6257ddc5def1f17fc649d Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Thu, 26 Aug 2021 14:24:48 -0300 Subject: [PATCH 01/13] Add transaction downloader --- zebra-chain/src/transaction/unmined.rs | 11 +- zebra-consensus/src/lib.rs | 2 +- zebra-consensus/src/transaction.rs | 3 + zebrad/src/commands/start.rs | 3 +- zebrad/src/components/inbound.rs | 31 ++- zebrad/src/components/mempool.rs | 1 + zebrad/src/components/mempool/downloads.rs | 233 +++++++++++++++++++++ 7 files changed, 279 insertions(+), 5 deletions(-) create mode 100644 zebrad/src/components/mempool/downloads.rs diff --git a/zebra-chain/src/transaction/unmined.rs b/zebra-chain/src/transaction/unmined.rs index afddc5e5403..22076bf6526 100644 --- a/zebra-chain/src/transaction/unmined.rs +++ b/zebra-chain/src/transaction/unmined.rs @@ -12,7 +12,7 @@ //! unmined transactions. They can be used to handle transactions regardless of version, //! and get the [`WtxId`] or [`Hash`] when required. -use std::sync::Arc; +use std::{fmt, sync::Arc}; #[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; @@ -93,6 +93,15 @@ impl From<&WtxId> for UnminedTxId { } } +impl fmt::Display for UnminedTxId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Legacy(hash) => hash.fmt(f), + Witnessed(id) => id.fmt(f), + } + } +} + impl UnminedTxId { /// Create a new `UnminedTxId` using a v1-v4 legacy transaction ID. /// diff --git a/zebra-consensus/src/lib.rs b/zebra-consensus/src/lib.rs index cf32a3c52db..ca55b0bd932 100644 --- a/zebra-consensus/src/lib.rs +++ b/zebra-consensus/src/lib.rs @@ -47,7 +47,7 @@ mod config; mod parameters; mod primitives; mod script; -mod transaction; +pub mod transaction; pub mod chain; #[allow(missing_docs)] diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index 2126218e648..42b0105fbe5 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -1,3 +1,5 @@ +//! Asynchronous verification of transactions. +//! use std::{ collections::HashMap, future::Future, @@ -50,6 +52,7 @@ where ZS: Service + Send + Clone + 'static, ZS::Future: Send + 'static, { + /// Create a new transaction verifier. pub fn new(network: Network, script_verifier: script::Verifier) -> Self { Self { network, diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 6f4a860a86a..0c1fb8cbd1a 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -56,7 +56,7 @@ impl StartCmd { info!("initializing verifiers"); // TODO: use the transaction verifier to verify mempool transactions (#2637, #2606) - let (chain_verifier, _tx_verifier) = zebra_consensus::chain::init( + let (chain_verifier, tx_verifier) = zebra_consensus::chain::init( config.consensus.clone(), config.network.network, state.clone(), @@ -75,6 +75,7 @@ impl StartCmd { setup_rx, state.clone(), chain_verifier.clone(), + tx_verifier.clone(), )); let (peer_set, address_book) = diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index b6372efd68a..a3005d2ae90 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -17,10 +17,12 @@ use zebra_network as zn; use zebra_state as zs; use zebra_chain::block::{self, Block}; -use zebra_consensus::chain::VerifyChainError; +use zebra_consensus::transaction; +use zebra_consensus::{chain::VerifyChainError, error::TransactionError}; use zebra_network::AddressBook; // Re-use the syncer timeouts for consistency. +use super::mempool::downloads::Downloads as TxDownloads; use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; mod downloads; @@ -29,7 +31,12 @@ use downloads::Downloads; type Outbound = Buffer, zn::Request>; type State = Buffer, zs::Request>; type Verifier = Buffer, block::Hash, VerifyChainError>, Arc>; +type TxVerifier = Buffer< + BoxService, + transaction::Request, +>; type InboundDownloads = Downloads, Timeout, State>; +type InboundTxDownloads = TxDownloads, Timeout>; pub type NetworkSetupData = (Outbound, Arc>); @@ -47,6 +54,10 @@ pub enum Setup { /// A service that verifies downloaded blocks. Given to `downloads` /// after the network is set up. verifier: Verifier, + + /// A service that verifies downloaded transactions. Given to `tx_downloads` + /// after the network is set up. + tx_verifier: TxVerifier, }, /// Network setup is complete. @@ -58,6 +69,8 @@ pub enum Setup { /// A `futures::Stream` that downloads and verifies gossiped blocks. downloads: Pin>, + + tx_downloads: Pin>, }, /// Temporary state used in the service's internal network initialization @@ -118,11 +131,13 @@ impl Inbound { network_setup: oneshot::Receiver, state: State, verifier: Verifier, + tx_verifier: TxVerifier, ) -> Self { Self { network_setup: Setup::AwaitingNetwork { network_setup, verifier, + tx_verifier, }, state, } @@ -155,17 +170,23 @@ impl Service for Inbound { Setup::AwaitingNetwork { mut network_setup, verifier, + tx_verifier, } => match network_setup.try_recv() { Ok((outbound, address_book)) => { let downloads = Box::pin(Downloads::new( - Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT), + Timeout::new(outbound.clone(), BLOCK_DOWNLOAD_TIMEOUT), Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT), self.state.clone(), )); + let tx_downloads = Box::pin(TxDownloads::new( + Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT), + Timeout::new(tx_verifier, BLOCK_VERIFY_TIMEOUT), + )); result = Ok(()); Setup::Initialized { address_book, downloads, + tx_downloads, } } Err(TryRecvError::Empty) => { @@ -174,6 +195,7 @@ impl Service for Inbound { Setup::AwaitingNetwork { network_setup, verifier, + tx_verifier, } } Err(error @ TryRecvError::Closed) => { @@ -195,13 +217,16 @@ impl Service for Inbound { Setup::Initialized { address_book, mut downloads, + mut tx_downloads, } => { while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {} + while let Poll::Ready(Some(_)) = tx_downloads.as_mut().poll_next(cx) {} result = Ok(()); Setup::Initialized { address_book, downloads, + tx_downloads, } } }; @@ -312,10 +337,12 @@ impl Service for Inbound { } zn::Request::PushTransaction(_transaction) => { debug!("ignoring unimplemented request"); + // TODO: send to Tx Download & Verify Stream async { Ok(zn::Response::Nil) }.boxed() } zn::Request::AdvertiseTransactionIds(_transactions) => { debug!("ignoring unimplemented request"); + // TODO: send to Tx Download & Verify Stream async { Ok(zn::Response::Nil) }.boxed() } zn::Request::AdvertiseBlock(hash) => { diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 7d419d0c7e6..187c599f98f 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -20,6 +20,7 @@ use crate::BoxError; mod crawler; mod error; mod storage; +pub mod downloads; #[cfg(test)] mod tests; diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs new file mode 100644 index 00000000000..f97367871f7 --- /dev/null +++ b/zebrad/src/components/mempool/downloads.rs @@ -0,0 +1,233 @@ +use std::{ + collections::HashMap, + pin::Pin, + task::{Context, Poll}, +}; + +use color_eyre::eyre::{eyre, Report}; +use futures::{ + future::TryFutureExt, + ready, + stream::{FuturesUnordered, Stream}, +}; +use pin_project::pin_project; +use tokio::{sync::oneshot, task::JoinHandle}; +use tower::{hedge, Service, ServiceExt}; +use tracing_futures::Instrument; + +use zebra_chain::{block::Height, transaction::UnminedTxId}; +use zebra_consensus::transaction as tx; +use zebra_network as zn; + +type BoxError = Box; + +#[derive(Copy, Clone, Debug)] +pub(crate) struct AlwaysHedge; + +impl hedge::Policy for AlwaysHedge { + fn can_retry(&self, _req: &Request) -> bool { + true + } + fn clone_request(&self, req: &Request) -> Option { + Some(req.clone()) + } +} + +/// Represents a [`Stream`] of download and verification tasks. +#[pin_project] +#[derive(Debug)] +pub struct Downloads +where + ZN: Service + Send + 'static, + ZN::Future: Send, + ZV: Service + Send + Clone + 'static, + ZV::Future: Send, +{ + // Services + /// A service that forwards requests to connected peers, and returns their + /// responses. + network: ZN, + + /// A service that verifies downloaded transactions. + verifier: ZV, + + // Internal downloads state + /// A list of pending transaction download and verify tasks. + #[pin] + pending: FuturesUnordered>>, + + /// A list of channels that can be used to cancel pending transaction download and + /// verify tasks. + cancel_handles: HashMap>, +} + +impl Stream for Downloads +where + ZN: Service + Send + Clone + 'static, + ZN::Future: Send, + ZV: Service + Send + Clone + 'static, + ZV::Future: Send, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + // CORRECTNESS + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + // If no download and verify tasks have exited since the last poll, this + // task is scheduled for wakeup when the next task becomes ready. + // + // TODO: + // This would be cleaner with poll_map #63514, but that's nightly only. + if let Some(join_result) = ready!(this.pending.poll_next(cx)) { + match join_result.expect("transaction download and verify tasks must not panic") { + Ok(hash) => { + this.cancel_handles.remove(&hash); + Poll::Ready(Some(Ok(hash))) + } + Err((e, hash)) => { + this.cancel_handles.remove(&hash); + Poll::Ready(Some(Err(e))) + } + } + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option) { + self.pending.size_hint() + } +} + +impl Downloads +where + ZN: Service + Send + Clone + 'static, + ZN::Future: Send, + ZV: Service + Send + Clone + 'static, + ZV::Future: Send, +{ + /// Initialize a new download stream with the provided `network` and + /// `verifier` services. + /// + /// The [`Downloads`] stream is agnostic to the network policy, so retry and + /// timeout limits should be applied to the `network` service passed into + /// this constructor. + pub fn new(network: ZN, verifier: ZV) -> Self { + Self { + network, + verifier, + pending: FuturesUnordered::new(), + cancel_handles: HashMap::new(), + } + } + + /// Queue a transaction for download and verification. + /// + /// This method waits for the network to become ready, and returns an error + /// only if the network service fails. It returns immediately after queuing + /// the request. + #[instrument(skip(self, txid), fields(txid = %txid))] + pub async fn download_and_verify(&mut self, txid: UnminedTxId) -> Result<(), Report> { + if self.cancel_handles.contains_key(&txid) { + return Err(eyre!("duplicate txid queued for download: {:?}", txid)); + } + + // We construct the transaction requests sequentially, waiting for the peer + // set to be ready to process each request. This ensures that we start + // transaction downloads in the order we want them (though they may resolve + // out of order), and it means that we respect backpressure. Otherwise, + // if we waited for readiness and did the service call in the spawned + // tasks, all of the spawned tasks would race each other waiting for the + // network to become ready. + tracing::debug!("waiting to request transaction"); + let tx_req = self.network.ready_and().await.map_err(|e| eyre!(e))?.call( + zn::Request::TransactionsById(std::iter::once(txid).collect()), + ); + tracing::debug!("requested transaction"); + + // This oneshot is used to signal cancellation to the download task. + let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>(); + + let mut verifier = self.verifier.clone(); + let task = tokio::spawn( + async move { + // TODO: if the verifier and cancel are both ready, which should + // we prefer? (Currently, select! chooses one at random.) + let rsp = tokio::select! { + _ = &mut cancel_rx => { + tracing::trace!("task cancelled prior to download completion"); + metrics::counter!("mempool.cancelled.transaction.count", 1); + return Err("canceled trasaction download".into()) + } + rsp = tx_req => rsp?, + }; + + let tx = if let zn::Response::Transactions(txs) = rsp { + txs.into_iter() + .next() + .expect("successful response has the transaction in it") + } else { + unreachable!("wrong response to transaction request"); + }; + metrics::counter!("mempoool.downloaded.transaction.count", 1); + + let rsp = verifier.ready_and().await?.call(tx::Request::Mempool { + transaction: tx, + // TODO: which height to use? + height: Height(0), + }); + // TODO: if the verifier and cancel are both ready, which should + // we prefer? (Currently, select! chooses one at random.) + let verification = tokio::select! { + _ = &mut cancel_rx => { + tracing::trace!("task cancelled prior to verification"); + metrics::counter!("mempool.cancelled.verify.count", 1); + return Err("canceled transaction verification".into()) + } + verification = rsp => verification, + }; + if verification.is_ok() { + metrics::counter!("mempool.verified.transaction.count", 1); + } + + verification + } + .in_current_span() + // Tack the hash onto the error so we can remove the cancel handle + // on failure as well as on success. + .map_err(move |e| (e, txid)), + ); + + self.pending.push(task); + // XXX replace with expect_none when stable + assert!( + self.cancel_handles.insert(txid, cancel_tx).is_none(), + "transactions are only queued once" + ); + + Ok(()) + } + + /// Cancel all running tasks and reset the downloader state. + pub fn cancel_all(&mut self) { + // Replace the pending task list with an empty one and drop it. + let _ = std::mem::take(&mut self.pending); + // Signal cancellation to all running tasks. + // Since we already dropped the JoinHandles above, they should + // fail silently. + for (_hash, cancel) in self.cancel_handles.drain() { + let _ = cancel.send(()); + } + assert!(self.pending.is_empty()); + assert!(self.cancel_handles.is_empty()); + } + + /// Get the number of currently in-flight download tasks. + pub fn in_flight(&self) -> usize { + self.pending.len() + } +} From 5f2e74418de17f92fd677a3eb794525551dc480d Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Fri, 27 Aug 2021 11:31:23 -0300 Subject: [PATCH 02/13] Changed mempool downloader to be like inbound --- zebrad/src/components/mempool/downloads.rs | 211 ++++++++++++--------- 1 file changed, 119 insertions(+), 92 deletions(-) diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index f97367871f7..5196b37aae9 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -4,7 +4,6 @@ use std::{ task::{Context, Poll}, }; -use color_eyre::eyre::{eyre, Report}; use futures::{ future::TryFutureExt, ready, @@ -12,7 +11,7 @@ use futures::{ }; use pin_project::pin_project; use tokio::{sync::oneshot, task::JoinHandle}; -use tower::{hedge, Service, ServiceExt}; +use tower::{Service, ServiceExt}; use tracing_futures::Instrument; use zebra_chain::{block::Height, transaction::UnminedTxId}; @@ -21,16 +20,45 @@ use zebra_network as zn; type BoxError = Box; -#[derive(Copy, Clone, Debug)] -pub(crate) struct AlwaysHedge; +/// The maximum number of concurrent inbound download and verify tasks. +/// +/// We expect the syncer to download and verify checkpoints, so this bound +/// can be small. +/// +/// ## Security +/// +/// We use a small concurrency limit, to prevent memory denial-of-service +/// attacks. +/// +/// The maximum block size is 2 million bytes. A deserialized malicious +/// block with ~225_000 transparent outputs can take up 9MB of RAM. As of +/// February 2021, a growing `Vec` can allocate up to 2x its current length, +/// leading to an overall memory usage of 18MB per malicious block. (See +/// #1880 for more details.) +/// +/// Malicious blocks will eventually timeout or fail contextual validation. +/// Once validation fails, the block is dropped, and its memory is deallocated. +/// +/// Since Zebra keeps an `inv` index, inbound downloads for malicious blocks +/// will be directed to the malicious node that originally gossiped the hash. +/// Therefore, this attack can be carried out by a single malicious node. +const MAX_INBOUND_CONCURRENCY: usize = 10; -impl hedge::Policy for AlwaysHedge { - fn can_retry(&self, _req: &Request) -> bool { - true - } - fn clone_request(&self, req: &Request) -> Option { - Some(req.clone()) - } +/// The action taken in response to a peer's gossiped transaction hash. +pub enum DownloadAction { + /// The transaction hash was successfully queued for download and verification. + AddedToQueue, + + /// The transaction hash is already queued, so this request was ignored. + /// + /// Another peer has already gossiped the same hash to us. + AlreadyQueued, + + /// The queue is at capacity, so this request was ignored. + /// + /// The sync service should discover this transaction later, when we are closer + /// to the tip. The queue's capacity is [`MAX_INBOUND_CONCURRENCY`]. + FullQueue, } /// Represents a [`Stream`] of download and verification tasks. @@ -127,80 +155,90 @@ where /// Queue a transaction for download and verification. /// - /// This method waits for the network to become ready, and returns an error - /// only if the network service fails. It returns immediately after queuing - /// the request. + /// Returns the action taken in response to the queue request. #[instrument(skip(self, txid), fields(txid = %txid))] - pub async fn download_and_verify(&mut self, txid: UnminedTxId) -> Result<(), Report> { + pub fn download_and_verify(&mut self, txid: UnminedTxId) -> DownloadAction { if self.cancel_handles.contains_key(&txid) { - return Err(eyre!("duplicate txid queued for download: {:?}", txid)); + tracing::debug!( + ?txid, + queue_len = self.pending.len(), + ?MAX_INBOUND_CONCURRENCY, + "transaction id already queued for inbound download: ignored transaction" + ); + return DownloadAction::AlreadyQueued; } - // We construct the transaction requests sequentially, waiting for the peer - // set to be ready to process each request. This ensures that we start - // transaction downloads in the order we want them (though they may resolve - // out of order), and it means that we respect backpressure. Otherwise, - // if we waited for readiness and did the service call in the spawned - // tasks, all of the spawned tasks would race each other waiting for the - // network to become ready. - tracing::debug!("waiting to request transaction"); - let tx_req = self.network.ready_and().await.map_err(|e| eyre!(e))?.call( - zn::Request::TransactionsById(std::iter::once(txid).collect()), - ); - tracing::debug!("requested transaction"); + if self.pending.len() >= MAX_INBOUND_CONCURRENCY { + tracing::info!( + ?txid, + queue_len = self.pending.len(), + ?MAX_INBOUND_CONCURRENCY, + "too many transactions queued for inbound download: ignored transaction" + ); + return DownloadAction::FullQueue; + } // This oneshot is used to signal cancellation to the download task. let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>(); - let mut verifier = self.verifier.clone(); - let task = tokio::spawn( - async move { - // TODO: if the verifier and cancel are both ready, which should - // we prefer? (Currently, select! chooses one at random.) - let rsp = tokio::select! { - _ = &mut cancel_rx => { - tracing::trace!("task cancelled prior to download completion"); - metrics::counter!("mempool.cancelled.transaction.count", 1); - return Err("canceled trasaction download".into()) - } - rsp = tx_req => rsp?, - }; - - let tx = if let zn::Response::Transactions(txs) = rsp { - txs.into_iter() - .next() - .expect("successful response has the transaction in it") - } else { - unreachable!("wrong response to transaction request"); - }; - metrics::counter!("mempoool.downloaded.transaction.count", 1); - - let rsp = verifier.ready_and().await?.call(tx::Request::Mempool { + let network = self.network.clone(); + let verifier = self.verifier.clone(); + + let fut = async move { + // TODO: adapt this for transaction / mempool + // // Check if the block is already in the state. + // // BUG: check if the hash is in any chain (#862). + // // Depth only checks the main chain. + // match state.oneshot(zs::Request::Depth(hash)).await { + // Ok(zs::Response::Depth(None)) => Ok(()), + // Ok(zs::Response::Depth(Some(_))) => Err("already present".into()), + // Ok(_) => unreachable!("wrong response"), + // Err(e) => Err(e), + // }?; + + let tx = if let zn::Response::Transactions(txs) = network + .oneshot(zn::Request::TransactionsById( + std::iter::once(txid).collect(), + )) + .await? + { + txs.into_iter() + .next() + .expect("successful response has the transaction in it") + } else { + unreachable!("wrong response to transaction request"); + }; + metrics::counter!("gossip.downloaded.transaction.count", 1); + + verifier + .oneshot(tx::Request::Mempool { transaction: tx, - // TODO: which height to use? + // TODO: pass correct height height: Height(0), - }); - // TODO: if the verifier and cancel are both ready, which should - // we prefer? (Currently, select! chooses one at random.) - let verification = tokio::select! { - _ = &mut cancel_rx => { - tracing::trace!("task cancelled prior to verification"); - metrics::counter!("mempool.cancelled.verify.count", 1); - return Err("canceled transaction verification".into()) - } - verification = rsp => verification, - }; - if verification.is_ok() { - metrics::counter!("mempool.verified.transaction.count", 1); - } + }) + .await + } + .map_ok(|hash| { + metrics::counter!("gossip.verified.transaction.count", 1); + hash + }) + // Tack the hash onto the error so we can remove the cancel handle + // on failure as well as on success. + .map_err(move |e| (e, txid)) + .in_current_span(); - verification + let task = tokio::spawn(async move { + // TODO: if the verifier and cancel are both ready, which should we + // prefer? (Currently, select! chooses one at random.) + tokio::select! { + _ = &mut cancel_rx => { + tracing::trace!("task cancelled prior to completion"); + metrics::counter!("gossip.cancelled.count", 1); + Err(("canceled".into(), txid)) + } + verification = fut => verification, } - .in_current_span() - // Tack the hash onto the error so we can remove the cancel handle - // on failure as well as on success. - .map_err(move |e| (e, txid)), - ); + }); self.pending.push(task); // XXX replace with expect_none when stable @@ -209,25 +247,14 @@ where "transactions are only queued once" ); - Ok(()) - } - - /// Cancel all running tasks and reset the downloader state. - pub fn cancel_all(&mut self) { - // Replace the pending task list with an empty one and drop it. - let _ = std::mem::take(&mut self.pending); - // Signal cancellation to all running tasks. - // Since we already dropped the JoinHandles above, they should - // fail silently. - for (_hash, cancel) in self.cancel_handles.drain() { - let _ = cancel.send(()); - } - assert!(self.pending.is_empty()); - assert!(self.cancel_handles.is_empty()); - } + tracing::debug!( + ?txid, + queue_len = self.pending.len(), + ?MAX_INBOUND_CONCURRENCY, + "queued transaction hash for download" + ); + metrics::gauge!("gossip.queued.transaction.count", self.pending.len() as _); - /// Get the number of currently in-flight download tasks. - pub fn in_flight(&self) -> usize { - self.pending.len() + DownloadAction::AddedToQueue } } From 8cc6eef02501b3a35f4c7acc8edfe882d20800b2 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Fri, 27 Aug 2021 16:56:40 -0300 Subject: [PATCH 03/13] Verifier working (logs result) --- grafana/transaction_verification.json | 207 +++++++++++++++++++++ zebra-consensus/src/error.rs | 3 + zebra-consensus/src/transaction.rs | 11 +- zebrad/src/components/inbound.rs | 24 ++- zebrad/src/components/mempool.rs | 2 +- zebrad/src/components/mempool/downloads.rs | 52 ++++-- zebrad/src/components/sync.rs | 8 + 7 files changed, 280 insertions(+), 27 deletions(-) create mode 100644 grafana/transaction_verification.json diff --git a/grafana/transaction_verification.json b/grafana/transaction_verification.json new file mode 100644 index 00000000000..0e96e030d5c --- /dev/null +++ b/grafana/transaction_verification.json @@ -0,0 +1,207 @@ +{ + "__inputs": [ + { + "name": "DS_PROMETHEUS-ZEBRA", + "label": "Prometheus-Zebra", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "8.1.2" + }, + { + "type": "panel", + "id": "graph", + "name": "Graph (old)", + "version": "" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "target": { + "limit": 100, + "matchAny": false, + "tags": [], + "type": "dashboard" + }, + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "id": null, + "iteration": 1630092146360, + "links": [], + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 0 + }, + "hiddenSeries": false, + "id": 6, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "8.1.2", + "pointradius": 2, + "points": false, + "renderer": "flot", + "repeatDirection": "h", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "rate(gossip_downloaded_transaction_count{job=\"$job\"}[1m]) * 60", + "interval": "", + "legendFormat": "gossip_downloaded_transaction_count per min", + "refId": "C" + }, + { + "exemplar": true, + "expr": "rate(gossip_verified_transaction_count{job=\"$job\"}[1m]) * 60", + "interval": "", + "legendFormat": "gossip_verified_transaction_count per min", + "refId": "D" + }, + { + "exemplar": true, + "expr": "gossip_queued_transaction_count{job=\"$job\"}", + "interval": "", + "legendFormat": "gossip_queued_transaction_count", + "refId": "E" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Transaction Verifier Gossip Count - $job", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "refresh": "5s", + "schemaVersion": 30, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "allValue": null, + "current": {}, + "datasource": "${DS_PROMETHEUS-ZEBRA}", + "definition": "label_values(zcash_chain_verified_block_height, job)", + "description": null, + "error": null, + "hide": 0, + "includeAll": true, + "label": null, + "multi": true, + "name": "job", + "options": [], + "query": { + "query": "label_values(zcash_chain_verified_block_height, job)", + "refId": "StandardVariableQuery" + }, + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "tagValuesQuery": "", + "tagsQuery": "", + "type": "query", + "useTags": false + } + ] + }, + "time": { + "from": "now-30m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "transaction verification", + "uid": "oBEHvS4nz", + "version": 2 +} \ No newline at end of file diff --git a/zebra-consensus/src/error.rs b/zebra-consensus/src/error.rs index 50a1ad7523a..b87f4008793 100644 --- a/zebra-consensus/src/error.rs +++ b/zebra-consensus/src/error.rs @@ -41,6 +41,9 @@ pub enum TransactionError { #[error("coinbase transaction MUST NOT have the EnableSpendsOrchard flag set")] CoinbaseHasEnableSpendsOrchard, + #[error("coinbase transaction MUST NOT exist in mempool")] + CoinbaseInMempool, + #[error("coinbase transaction failed subsidy validation")] Subsidy(#[from] SubsidyError), diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index 42b0105fbe5..778af47af2c 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -168,11 +168,6 @@ where // TODO: break up each chunk into its own method fn call(&mut self, req: Request) -> Self::Future { - if req.is_mempool() { - // XXX determine exactly which rules apply to mempool transactions - unimplemented!("Zebra does not yet have a mempool (#2309)"); - } - let script_verifier = self.script_verifier.clone(); let network = self.network; @@ -187,7 +182,11 @@ where check::has_inputs_and_outputs(&tx)?; if tx.is_coinbase() { - check::coinbase_tx_no_prevout_joinsplit_spend(&tx)?; + if req.is_mempool() { + return Err(TransactionError::CoinbaseInMempool); + } else { + check::coinbase_tx_no_prevout_joinsplit_spend(&tx)?; + } } // [Canopy onward]: `vpub_old` MUST be zero. diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index a3005d2ae90..7dd35f7a36b 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -21,6 +21,8 @@ use zebra_consensus::transaction; use zebra_consensus::{chain::VerifyChainError, error::TransactionError}; use zebra_network::AddressBook; +use crate::components::sync::{TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT}; + // Re-use the syncer timeouts for consistency. use super::mempool::downloads::Downloads as TxDownloads; use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; @@ -36,7 +38,7 @@ type TxVerifier = Buffer< transaction::Request, >; type InboundDownloads = Downloads, Timeout, State>; -type InboundTxDownloads = TxDownloads, Timeout>; +type InboundTxDownloads = TxDownloads, Timeout, State>; pub type NetworkSetupData = (Outbound, Arc>); @@ -179,8 +181,9 @@ impl Service for Inbound { self.state.clone(), )); let tx_downloads = Box::pin(TxDownloads::new( - Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT), - Timeout::new(tx_verifier, BLOCK_VERIFY_TIMEOUT), + Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT), + Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT), + self.state.clone(), )); result = Ok(()); Setup::Initialized { @@ -340,9 +343,18 @@ impl Service for Inbound { // TODO: send to Tx Download & Verify Stream async { Ok(zn::Response::Nil) }.boxed() } - zn::Request::AdvertiseTransactionIds(_transactions) => { - debug!("ignoring unimplemented request"); - // TODO: send to Tx Download & Verify Stream + zn::Request::AdvertiseTransactionIds(transactions) => { + if let Setup::Initialized { tx_downloads, .. } = &mut self.network_setup { + // TODO: check if we're close to the tip before proceeding? + // what do we do if it's not? + for txid in transactions { + tx_downloads.download_and_verify(txid); + } + } else { + info!( + "ignoring `AdvertiseTransactionIds` request from remote peer during network setup" + ); + } async { Ok(zn::Response::Nil) }.boxed() } zn::Request::AdvertiseBlock(hash) => { diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 187c599f98f..278fcb802be 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -18,9 +18,9 @@ use zebra_chain::{ use crate::BoxError; mod crawler; +pub mod downloads; mod error; mod storage; -pub mod downloads; #[cfg(test)] mod tests; diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 5196b37aae9..d823411b3f4 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -4,6 +4,7 @@ use std::{ task::{Context, Poll}, }; +use color_eyre::eyre::eyre; use futures::{ future::TryFutureExt, ready, @@ -14,9 +15,10 @@ use tokio::{sync::oneshot, task::JoinHandle}; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; -use zebra_chain::{block::Height, transaction::UnminedTxId}; +use zebra_chain::transaction::UnminedTxId; use zebra_consensus::transaction as tx; use zebra_network as zn; +use zebra_state as zs; type BoxError = Box; @@ -30,16 +32,16 @@ type BoxError = Box; /// We use a small concurrency limit, to prevent memory denial-of-service /// attacks. /// -/// The maximum block size is 2 million bytes. A deserialized malicious -/// block with ~225_000 transparent outputs can take up 9MB of RAM. As of +/// The maximum transaction size is 2 million bytes. A deserialized malicious +/// transaction with ~225_000 transparent outputs can take up 9MB of RAM. As of /// February 2021, a growing `Vec` can allocate up to 2x its current length, -/// leading to an overall memory usage of 18MB per malicious block. (See +/// leading to an overall memory usage of 18MB per malicious transaction. (See /// #1880 for more details.) /// -/// Malicious blocks will eventually timeout or fail contextual validation. +/// Malicious transactions will eventually timeout or fail contextual validation. /// Once validation fails, the block is dropped, and its memory is deallocated. /// -/// Since Zebra keeps an `inv` index, inbound downloads for malicious blocks +/// Since Zebra keeps an `inv` index, inbound downloads for malicious transactions /// will be directed to the malicious node that originally gossiped the hash. /// Therefore, this attack can be carried out by a single malicious node. const MAX_INBOUND_CONCURRENCY: usize = 10; @@ -64,12 +66,14 @@ pub enum DownloadAction { /// Represents a [`Stream`] of download and verification tasks. #[pin_project] #[derive(Debug)] -pub struct Downloads +pub struct Downloads where ZN: Service + Send + 'static, ZN::Future: Send, ZV: Service + Send + Clone + 'static, ZV::Future: Send, + ZS: Service + Send + Clone + 'static, + ZS::Future: Send, { // Services /// A service that forwards requests to connected peers, and returns their @@ -79,6 +83,9 @@ where /// A service that verifies downloaded transactions. verifier: ZV, + /// A service that manages cached blockchain state. + state: ZS, + // Internal downloads state /// A list of pending transaction download and verify tasks. #[pin] @@ -89,12 +96,14 @@ where cancel_handles: HashMap>, } -impl Stream for Downloads +impl Stream for Downloads where ZN: Service + Send + Clone + 'static, ZN::Future: Send, ZV: Service + Send + Clone + 'static, ZV::Future: Send, + ZS: Service + Send + Clone + 'static, + ZS::Future: Send, { type Item = Result; @@ -131,12 +140,14 @@ where } } -impl Downloads +impl Downloads where ZN: Service + Send + Clone + 'static, ZN::Future: Send, ZV: Service + Send + Clone + 'static, ZV::Future: Send, + ZS: Service + Send + Clone + 'static, + ZS::Future: Send, { /// Initialize a new download stream with the provided `network` and /// `verifier` services. @@ -144,10 +155,11 @@ where /// The [`Downloads`] stream is agnostic to the network policy, so retry and /// timeout limits should be applied to the `network` service passed into /// this constructor. - pub fn new(network: ZN, verifier: ZV) -> Self { + pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self { Self { network, verifier, + state, pending: FuturesUnordered::new(), cancel_handles: HashMap::new(), } @@ -183,6 +195,7 @@ where let network = self.network.clone(); let verifier = self.verifier.clone(); + let state = self.state.clone(); let fut = async move { // TODO: adapt this for transaction / mempool @@ -196,6 +209,14 @@ where // Err(e) => Err(e), // }?; + let height = match state.oneshot(zs::Request::Tip).await { + Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()), + Ok(zs::Response::Tip(Some((height, _hash)))) => Ok(height), + Ok(_) => unreachable!("wrong response"), + Err(e) => Err(e), + }?; + let height = (height + 1).ok_or_else(|| eyre!("no next height"))?; + let tx = if let zn::Response::Transactions(txs) = network .oneshot(zn::Request::TransactionsById( std::iter::once(txid).collect(), @@ -210,13 +231,16 @@ where }; metrics::counter!("gossip.downloaded.transaction.count", 1); - verifier + let result = verifier .oneshot(tx::Request::Mempool { transaction: tx, - // TODO: pass correct height - height: Height(0), + height, }) - .await + .await; + + tracing::debug!(?txid, ?result, "verified transaction for the mempool"); + + result } .map_ok(|hash| { metrics::counter!("gossip.verified.transaction.count", 1); diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 333ff3ff751..849745f8b20 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -116,6 +116,14 @@ pub(super) const BLOCK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15); /// failure loop. pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(180); +/// Controls how long we wait for a transaction download request to complete. +/// TODO: review value and rationale +pub(super) const TRANSACTION_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15); + +/// Controls how long we wait for a transaction verify request to complete. +/// TODO: review value and rationale +pub(super) const TRANSACTION_VERIFY_TIMEOUT: Duration = Duration::from_secs(180); + /// Controls how long we wait to restart syncing after finishing a sync run. /// /// This delay should be long enough to: From 345c86db8acf3562fc8243f4d0db77d7171eee90 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Mon, 30 Aug 2021 12:04:24 -0300 Subject: [PATCH 04/13] Apply suggestions from code review Co-authored-by: teor --- zebra-consensus/src/error.rs | 2 +- zebrad/src/components/mempool/downloads.rs | 21 ++++++++++----------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/zebra-consensus/src/error.rs b/zebra-consensus/src/error.rs index b87f4008793..499b679040f 100644 --- a/zebra-consensus/src/error.rs +++ b/zebra-consensus/src/error.rs @@ -41,7 +41,7 @@ pub enum TransactionError { #[error("coinbase transaction MUST NOT have the EnableSpendsOrchard flag set")] CoinbaseHasEnableSpendsOrchard, - #[error("coinbase transaction MUST NOT exist in mempool")] + #[error("coinbase inputs MUST NOT exist in mempool")] CoinbaseInMempool, #[error("coinbase transaction failed subsidy validation")] diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index d823411b3f4..ac6048139ab 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -24,7 +24,7 @@ type BoxError = Box; /// The maximum number of concurrent inbound download and verify tasks. /// -/// We expect the syncer to download and verify checkpoints, so this bound +/// We expect the mempool crawler to download and verify most mempool transactions, so this bound /// can be small. /// /// ## Security @@ -33,13 +33,11 @@ type BoxError = Box; /// attacks. /// /// The maximum transaction size is 2 million bytes. A deserialized malicious -/// transaction with ~225_000 transparent outputs can take up 9MB of RAM. As of -/// February 2021, a growing `Vec` can allocate up to 2x its current length, -/// leading to an overall memory usage of 18MB per malicious transaction. (See -/// #1880 for more details.) +/// transaction with ~225_000 transparent outputs can take up 9MB of RAM. +/// (See #1880 for more details.) /// -/// Malicious transactions will eventually timeout or fail contextual validation. -/// Once validation fails, the block is dropped, and its memory is deallocated. +/// Malicious transactions will eventually timeout or fail validation. +/// Once validation fails, the transaction is dropped, and its memory is deallocated. /// /// Since Zebra keeps an `inv` index, inbound downloads for malicious transactions /// will be directed to the malicious node that originally gossiped the hash. @@ -53,13 +51,15 @@ pub enum DownloadAction { /// The transaction hash is already queued, so this request was ignored. /// - /// Another peer has already gossiped the same hash to us. + /// Another peer has already gossiped the same hash to us, or the mempool crawler has fetched it. AlreadyQueued, /// The queue is at capacity, so this request was ignored. /// - /// The sync service should discover this transaction later, when we are closer - /// to the tip. The queue's capacity is [`MAX_INBOUND_CONCURRENCY`]. + /// The mempool crawler should discover this transaction later. + /// If it is mined into a block, it will be downloaded by the syncer, or the inbound block downloader. + /// + /// The queue's capacity is [`MAX_INBOUND_CONCURRENCY`]. FullQueue, } @@ -265,7 +265,6 @@ where }); self.pending.push(task); - // XXX replace with expect_none when stable assert!( self.cancel_handles.insert(txid, cancel_tx).is_none(), "transactions are only queued once" From 033bb287b7fcdbff545c15cb5ef5e5b79603a78d Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Mon, 30 Aug 2021 12:08:11 -0300 Subject: [PATCH 05/13] Apply suggestions from code review Co-authored-by: teor --- zebrad/src/components/mempool/downloads.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index ac6048139ab..1f3624de4bf 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -117,8 +117,7 @@ where // If no download and verify tasks have exited since the last poll, this // task is scheduled for wakeup when the next task becomes ready. // - // TODO: - // This would be cleaner with poll_map #63514, but that's nightly only. + // TODO: this would be cleaner with poll_map (#2693) if let Some(join_result) = ready!(this.pending.poll_next(cx)) { match join_result.expect("transaction download and verify tasks must not panic") { Ok(hash) => { From 37ddeb0e4126679fd63d220c2e1e2d0a04336d10 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Mon, 30 Aug 2021 12:06:08 -0300 Subject: [PATCH 06/13] Fix coinbase check for mempool, improve is_coinbase() docs --- zebra-chain/src/transaction.rs | 3 ++- zebra-consensus/src/transaction.rs | 9 ++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/zebra-chain/src/transaction.rs b/zebra-chain/src/transaction.rs index a867d5390e9..c425bd4f088 100644 --- a/zebra-chain/src/transaction.rs +++ b/zebra-chain/src/transaction.rs @@ -376,7 +376,8 @@ impl Transaction { } } - /// Returns `true` if this transaction is a coinbase transaction. + /// Returns `true` if this transaction is a coinbase transaction, + /// that is, has a single input and it is a coinbase input. pub fn is_coinbase(&self) -> bool { self.inputs().len() == 1 && matches!( diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index 778af47af2c..1382b393f47 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -182,11 +182,10 @@ where check::has_inputs_and_outputs(&tx)?; if tx.is_coinbase() { - if req.is_mempool() { - return Err(TransactionError::CoinbaseInMempool); - } else { - check::coinbase_tx_no_prevout_joinsplit_spend(&tx)?; - } + check::coinbase_tx_no_prevout_joinsplit_spend(&tx)?; + } + if req.is_mempool() && tx.contains_coinbase_input() { + return Err(TransactionError::CoinbaseInMempool); } // [Canopy onward]: `vpub_old` MUST be zero. From 8da4f402f54f8aee224144f96469272a521606b6 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Mon, 30 Aug 2021 12:06:54 -0300 Subject: [PATCH 07/13] Change other downloads.rs docs to reflect the mempool downloads.rs changes --- zebrad/src/components/inbound/downloads.rs | 10 +++------- zebrad/src/components/sync/downloads.rs | 4 +--- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index 320993e85b5..f832f382a98 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -32,10 +32,8 @@ type BoxError = Box; /// attacks. /// /// The maximum block size is 2 million bytes. A deserialized malicious -/// block with ~225_000 transparent outputs can take up 9MB of RAM. As of -/// February 2021, a growing `Vec` can allocate up to 2x its current length, -/// leading to an overall memory usage of 18MB per malicious block. (See -/// #1880 for more details.) +/// block with ~225_000 transparent outputs can take up 9MB of RAM. +/// (See #1880 for more details.) /// /// Malicious blocks will eventually timeout or fail contextual validation. /// Once validation fails, the block is dropped, and its memory is deallocated. @@ -116,8 +114,7 @@ where // If no download and verify tasks have exited since the last poll, this // task is scheduled for wakeup when the next task becomes ready. // - // TODO: - // This would be cleaner with poll_map #63514, but that's nightly only. + // TODO: this would be cleaner with poll_map (#2693) if let Some(join_result) = ready!(this.pending.poll_next(cx)) { match join_result.expect("block download and verify tasks must not panic") { Ok(hash) => { @@ -245,7 +242,6 @@ where }); self.pending.push(task); - // XXX replace with expect_none when stable assert!( self.cancel_handles.insert(hash, cancel_tx).is_none(), "blocks are only queued once" diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 11f7ab6ddbb..ce5089c2d32 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -80,8 +80,7 @@ where // If no download and verify tasks have exited since the last poll, this // task is scheduled for wakeup when the next task becomes ready. // - // TODO: - // This would be cleaner with poll_map #63514, but that's nightly only. + // TODO: this would be cleaner with poll_map (#2693) if let Some(join_result) = ready!(this.pending.poll_next(cx)) { match join_result.expect("block download and verify tasks must not panic") { Ok(hash) => { @@ -203,7 +202,6 @@ where ); self.pending.push(task); - // XXX replace with expect_none when stable assert!( self.cancel_handles.insert(hash, cancel_tx).is_none(), "blocks are only queued once" From a68bf1c4b0419c23ebb4b9d36a8c20fe6e5a3fa4 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Mon, 30 Aug 2021 14:19:47 -0300 Subject: [PATCH 08/13] Change TIMEOUTs to downloads.rs; add docs --- zebrad/src/components/inbound.rs | 6 +++--- zebrad/src/components/mempool/downloads.rs | 19 +++++++++++++++++++ zebrad/src/components/sync.rs | 8 -------- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 7dd35f7a36b..a44debee644 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -21,10 +21,10 @@ use zebra_consensus::transaction; use zebra_consensus::{chain::VerifyChainError, error::TransactionError}; use zebra_network::AddressBook; -use crate::components::sync::{TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT}; - +use super::mempool::downloads::{ + Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT, +}; // Re-use the syncer timeouts for consistency. -use super::mempool::downloads::Downloads as TxDownloads; use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; mod downloads; diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 1f3624de4bf..56b51bd2daa 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -2,6 +2,7 @@ use std::{ collections::HashMap, pin::Pin, task::{Context, Poll}, + time::Duration, }; use color_eyre::eyre::eyre; @@ -20,8 +21,26 @@ use zebra_consensus::transaction as tx; use zebra_network as zn; use zebra_state as zs; +use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; + type BoxError = Box; +/// Controls how long we wait for a transaction download request to complete. +/// +/// This is currently equal to [`crate::components::sync::BLOCK_DOWNLOAD_TIMEOUT`] for +/// consistency, even though parts of the rationale used for defining the value +/// don't apply here (e.g. we can drop transactions hashes when the queue is full). +pub(crate) const TRANSACTION_DOWNLOAD_TIMEOUT: Duration = BLOCK_DOWNLOAD_TIMEOUT; + +/// Controls how long we wait for a transaction verify request to complete. +/// +/// This is currently equal to [`crate::components::sync::BLOCK_VERIFY_TIMEOUT`] for +/// consistency. +/// +/// This timeout may lead to denial of service, which will be handled in +/// https://github.com/ZcashFoundation/zebra/issues/2694 +pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT; + /// The maximum number of concurrent inbound download and verify tasks. /// /// We expect the mempool crawler to download and verify most mempool transactions, so this bound diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 849745f8b20..333ff3ff751 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -116,14 +116,6 @@ pub(super) const BLOCK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15); /// failure loop. pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(180); -/// Controls how long we wait for a transaction download request to complete. -/// TODO: review value and rationale -pub(super) const TRANSACTION_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15); - -/// Controls how long we wait for a transaction verify request to complete. -/// TODO: review value and rationale -pub(super) const TRANSACTION_VERIFY_TIMEOUT: Duration = Duration::from_secs(180); - /// Controls how long we wait to restart syncing after finishing a sync run. /// /// This delay should be long enough to: From 315eaf536433597ef5e88c04fc584af41ac12402 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Tue, 31 Aug 2021 14:31:12 -0300 Subject: [PATCH 09/13] Renamed is_coinbase() to has_valid_coinbase_transaction_inputs() and contains_coinbase_input() to has_any_coinbase_inputs(); reorder checks --- zebra-chain/src/transaction.rs | 8 ++++---- zebra-chain/src/transaction/arbitrary.rs | 2 +- zebra-chain/src/transparent/utxo.rs | 2 +- zebra-consensus/src/block/check.rs | 4 ++-- zebra-consensus/src/transaction.rs | 10 +++++----- zebra-consensus/src/transaction/check.rs | 2 +- zebra-consensus/src/transaction/tests.rs | 15 ++++++++++----- zebra-state/src/service/check/utxo.rs | 2 +- zebra-state/src/service/tests.rs | 2 +- 9 files changed, 26 insertions(+), 21 deletions(-) diff --git a/zebra-chain/src/transaction.rs b/zebra-chain/src/transaction.rs index c425bd4f088..544f969dd66 100644 --- a/zebra-chain/src/transaction.rs +++ b/zebra-chain/src/transaction.rs @@ -376,9 +376,9 @@ impl Transaction { } } - /// Returns `true` if this transaction is a coinbase transaction, - /// that is, has a single input and it is a coinbase input. - pub fn is_coinbase(&self) -> bool { + /// Returns `true` if this transaction has valid inputs for a coinbase + /// transaction, that is, has a single input and it is a coinbase input. + pub fn has_valid_coinbase_transaction_inputs(&self) -> bool { self.inputs().len() == 1 && matches!( self.inputs().get(0), @@ -387,7 +387,7 @@ impl Transaction { } /// Returns `true` if transaction contains any coinbase inputs. - pub fn contains_coinbase_input(&self) -> bool { + pub fn has_any_coinbase_inputs(&self) -> bool { self.inputs() .iter() .any(|input| matches!(input, transparent::Input::Coinbase { .. })) diff --git a/zebra-chain/src/transaction/arbitrary.rs b/zebra-chain/src/transaction/arbitrary.rs index c9cf3686cba..2a3c1144616 100644 --- a/zebra-chain/src/transaction/arbitrary.rs +++ b/zebra-chain/src/transaction/arbitrary.rs @@ -428,7 +428,7 @@ impl Transaction { &mut self, outputs: &HashMap, ) -> Result, ValueBalanceError> { - if self.is_coinbase() { + if self.has_valid_coinbase_transaction_inputs() { // TODO: if needed, fixup coinbase: // - miner subsidy // - founders reward or funding streams (hopefully not?) diff --git a/zebra-chain/src/transparent/utxo.rs b/zebra-chain/src/transparent/utxo.rs index 99c5586b3fb..9362266df67 100644 --- a/zebra-chain/src/transparent/utxo.rs +++ b/zebra-chain/src/transparent/utxo.rs @@ -157,7 +157,7 @@ pub(crate) fn new_transaction_ordered_outputs( ) -> HashMap { let mut new_ordered_outputs = HashMap::new(); - let from_coinbase = transaction.is_coinbase(); + let from_coinbase = transaction.has_valid_coinbase_transaction_inputs(); for (output_index_in_transaction, output) in transaction.outputs().iter().cloned().enumerate() { let output_index_in_transaction = output_index_in_transaction .try_into() diff --git a/zebra-consensus/src/block/check.rs b/zebra-consensus/src/block/check.rs index e7620067b19..71d4de2e03e 100644 --- a/zebra-consensus/src/block/check.rs +++ b/zebra-consensus/src/block/check.rs @@ -27,10 +27,10 @@ pub fn coinbase_is_first(block: &Block) -> Result<(), BlockError> { .get(0) .ok_or(BlockError::NoTransactions)?; let mut rest = block.transactions.iter().skip(1); - if !first.is_coinbase() { + if !first.has_valid_coinbase_transaction_inputs() { return Err(TransactionError::CoinbasePosition)?; } - if rest.any(|tx| tx.contains_coinbase_input()) { + if rest.any(|tx| tx.has_any_coinbase_inputs()) { return Err(TransactionError::CoinbaseAfterFirst)?; } diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index 1382b393f47..42dd7ce5fe9 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -181,12 +181,12 @@ where // Do basic checks first check::has_inputs_and_outputs(&tx)?; - if tx.is_coinbase() { - check::coinbase_tx_no_prevout_joinsplit_spend(&tx)?; - } - if req.is_mempool() && tx.contains_coinbase_input() { + if req.is_mempool() && tx.has_any_coinbase_inputs() { return Err(TransactionError::CoinbaseInMempool); } + if tx.has_valid_coinbase_transaction_inputs() { + check::coinbase_tx_no_prevout_joinsplit_spend(&tx)?; + } // [Canopy onward]: `vpub_old` MUST be zero. // https://zips.z.cash/protocol/protocol.pdf#joinsplitdesc @@ -390,7 +390,7 @@ where ) -> Result { let transaction = request.transaction(); - if transaction.is_coinbase() { + if transaction.has_valid_coinbase_transaction_inputs() { // The script verifier only verifies PrevOut inputs and their corresponding UTXOs. // Coinbase transactions don't have any PrevOut inputs. Ok(AsyncChecks::new()) diff --git a/zebra-consensus/src/transaction/check.rs b/zebra-consensus/src/transaction/check.rs index 7dfb224111d..3dd667f4a20 100644 --- a/zebra-consensus/src/transaction/check.rs +++ b/zebra-consensus/src/transaction/check.rs @@ -51,7 +51,7 @@ pub fn has_inputs_and_outputs(tx: &Transaction) -> Result<(), TransactionError> /// /// https://zips.z.cash/protocol/protocol.pdf#txnencodingandconsensus pub fn coinbase_tx_no_prevout_joinsplit_spend(tx: &Transaction) -> Result<(), TransactionError> { - if tx.is_coinbase() { + if tx.has_valid_coinbase_transaction_inputs() { if tx.contains_prevout_input() { return Err(TransactionError::CoinbaseHasPrevOutInput); } else if tx.joinsplit_count() > 0 { diff --git a/zebra-consensus/src/transaction/tests.rs b/zebra-consensus/src/transaction/tests.rs index 800b06db8d1..2efb92b377d 100644 --- a/zebra-consensus/src/transaction/tests.rs +++ b/zebra-consensus/src/transaction/tests.rs @@ -165,7 +165,7 @@ fn v5_coinbase_transaction_without_enable_spends_flag_passes_validation() { zebra_test::vectors::MAINNET_BLOCKS.iter(), ) .rev() - .find(|transaction| transaction.is_coinbase()) + .find(|transaction| transaction.has_valid_coinbase_transaction_inputs()) .expect("At least one fake V5 coinbase transaction in the test vectors"); insert_fake_orchard_shielded_data(&mut transaction); @@ -180,7 +180,7 @@ fn v5_coinbase_transaction_with_enable_spends_flag_fails_validation() { zebra_test::vectors::MAINNET_BLOCKS.iter(), ) .rev() - .find(|transaction| transaction.is_coinbase()) + .find(|transaction| transaction.has_valid_coinbase_transaction_inputs()) .expect("At least one fake V5 coinbase transaction in the test vectors"); let shielded_data = insert_fake_orchard_shielded_data(&mut transaction); @@ -702,7 +702,8 @@ fn v4_with_sapling_spends() { let (height, transaction) = test_transactions(network) .rev() .filter(|(_, transaction)| { - !transaction.is_coinbase() && transaction.inputs().is_empty() + !transaction.has_valid_coinbase_transaction_inputs() + && transaction.inputs().is_empty() }) .find(|(_, transaction)| transaction.sapling_spends_per_anchor().next().is_some()) .expect("No transaction found with Sapling spends"); @@ -739,7 +740,8 @@ fn v4_with_sapling_outputs_and_no_spends() { let (height, transaction) = test_transactions(network) .rev() .filter(|(_, transaction)| { - !transaction.is_coinbase() && transaction.inputs().is_empty() + !transaction.has_valid_coinbase_transaction_inputs() + && transaction.inputs().is_empty() }) .find(|(_, transaction)| { transaction.sapling_spends_per_anchor().next().is_none() @@ -781,7 +783,10 @@ fn v5_with_sapling_spends() { let transaction = fake_v5_transactions_for_network(network, zebra_test::vectors::MAINNET_BLOCKS.iter()) .rev() - .filter(|transaction| !transaction.is_coinbase() && transaction.inputs().is_empty()) + .filter(|transaction| { + !transaction.has_valid_coinbase_transaction_inputs() + && transaction.inputs().is_empty() + }) .find(|transaction| transaction.sapling_spends_per_anchor().next().is_some()) .expect("No transaction found with Sapling spends"); diff --git a/zebra-state/src/service/check/utxo.rs b/zebra-state/src/service/check/utxo.rs index d402b551c15..927aa4b7036 100644 --- a/zebra-state/src/service/check/utxo.rs +++ b/zebra-state/src/service/check/utxo.rs @@ -227,7 +227,7 @@ pub fn remaining_transaction_value( ) -> Result<(), ValidateContextError> { for (tx_index_in_block, transaction) in prepared.block.transactions.iter().enumerate() { // TODO: check coinbase transaction remaining value (#338, #1162) - if transaction.is_coinbase() { + if transaction.has_valid_coinbase_transaction_inputs() { continue; } diff --git a/zebra-state/src/service/tests.rs b/zebra-state/src/service/tests.rs index b567f310545..78f3e73bd23 100644 --- a/zebra-state/src/service/tests.rs +++ b/zebra-state/src/service/tests.rs @@ -91,7 +91,7 @@ async fn test_populated_state_responds_correctly( Ok(Response::Transaction(Some(transaction.clone()))), )); - let from_coinbase = transaction.is_coinbase(); + let from_coinbase = transaction.has_valid_coinbase_transaction_inputs(); for (index, output) in transaction.outputs().iter().cloned().enumerate() { let outpoint = transparent::OutPoint { hash: transaction_hash, From d90463c3952bc9652c44aa2ee923b2237f50a745 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Tue, 31 Aug 2021 15:32:17 -0300 Subject: [PATCH 10/13] Validate network upgrade for V4 transactions; check before computing sighash (for V5 too) --- zebra-consensus/src/transaction.rs | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index 42dd7ce5fe9..ad88dca4b4e 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -279,6 +279,9 @@ where ) -> Result { let tx = request.transaction(); let upgrade = request.upgrade(network); + + Self::verify_v4_transaction_network_upgrade(&tx, upgrade)?; + let shielded_sighash = tx.sighash(upgrade, HashType::ALL, None); Ok( @@ -299,6 +302,29 @@ where ) } + /// Verifies if a V4 `transaction` is supported by `network_upgrade`. + fn verify_v4_transaction_network_upgrade( + transaction: &Transaction, + network_upgrade: NetworkUpgrade, + ) -> Result<(), TransactionError> { + match network_upgrade { + // Supports V4 transactions + NetworkUpgrade::Sapling + | NetworkUpgrade::Blossom + | NetworkUpgrade::Heartwood + | NetworkUpgrade::Canopy + | NetworkUpgrade::Nu5 => Ok(()), + + // Does not support V4 transactions + NetworkUpgrade::Genesis + | NetworkUpgrade::BeforeOverwinter + | NetworkUpgrade::Overwinter => Err(TransactionError::UnsupportedByNetworkUpgrade( + transaction.version(), + network_upgrade, + )), + } + } + /// Verify a V5 transaction. /// /// Returns a set of asynchronous checks that must all succeed for the transaction to be @@ -328,10 +354,11 @@ where ) -> Result { let transaction = request.transaction(); let upgrade = request.upgrade(network); - let shielded_sighash = transaction.sighash(upgrade, HashType::ALL, None); Self::verify_v5_transaction_network_upgrade(&transaction, upgrade)?; + let shielded_sighash = transaction.sighash(upgrade, HashType::ALL, None); + let _async_checks = Self::verify_transparent_inputs_and_outputs( &request, network, From a3b73f5cb2832a5da842d28f135190aa4de9c6b6 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Tue, 31 Aug 2021 16:56:40 -0300 Subject: [PATCH 11/13] Add block_ prefix to downloads and verifier --- zebrad/src/components/inbound.rs | 37 +++++++++++++++++--------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index a44debee644..0d27d417a8c 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -32,12 +32,12 @@ use downloads::Downloads; type Outbound = Buffer, zn::Request>; type State = Buffer, zs::Request>; -type Verifier = Buffer, block::Hash, VerifyChainError>, Arc>; +type BlockVerifier = Buffer, block::Hash, VerifyChainError>, Arc>; type TxVerifier = Buffer< BoxService, transaction::Request, >; -type InboundDownloads = Downloads, Timeout, State>; +type InboundDownloads = Downloads, Timeout, State>; type InboundTxDownloads = TxDownloads, Timeout, State>; pub type NetworkSetupData = (Outbound, Arc>); @@ -53,9 +53,9 @@ pub enum Setup { /// after the network is set up. network_setup: oneshot::Receiver, - /// A service that verifies downloaded blocks. Given to `downloads` + /// A service that verifies downloaded blocks. Given to `block_downloads` /// after the network is set up. - verifier: Verifier, + block_verifier: BlockVerifier, /// A service that verifies downloaded transactions. Given to `tx_downloads` /// after the network is set up. @@ -70,7 +70,7 @@ pub enum Setup { address_book: Arc>, /// A `futures::Stream` that downloads and verifies gossiped blocks. - downloads: Pin>, + block_downloads: Pin>, tx_downloads: Pin>, }, @@ -132,13 +132,13 @@ impl Inbound { pub fn new( network_setup: oneshot::Receiver, state: State, - verifier: Verifier, + block_verifier: BlockVerifier, tx_verifier: TxVerifier, ) -> Self { Self { network_setup: Setup::AwaitingNetwork { network_setup, - verifier, + block_verifier, tx_verifier, }, state, @@ -171,13 +171,13 @@ impl Service for Inbound { self.network_setup = match self.take_setup() { Setup::AwaitingNetwork { mut network_setup, - verifier, + block_verifier, tx_verifier, } => match network_setup.try_recv() { Ok((outbound, address_book)) => { - let downloads = Box::pin(Downloads::new( + let block_downloads = Box::pin(Downloads::new( Timeout::new(outbound.clone(), BLOCK_DOWNLOAD_TIMEOUT), - Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT), + Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT), self.state.clone(), )); let tx_downloads = Box::pin(TxDownloads::new( @@ -188,7 +188,7 @@ impl Service for Inbound { result = Ok(()); Setup::Initialized { address_book, - downloads, + block_downloads, tx_downloads, } } @@ -197,7 +197,7 @@ impl Service for Inbound { result = Ok(()); Setup::AwaitingNetwork { network_setup, - verifier, + block_verifier, tx_verifier, } } @@ -219,16 +219,16 @@ impl Service for Inbound { // Clean up completed download tasks, ignoring their results Setup::Initialized { address_book, - mut downloads, + mut block_downloads, mut tx_downloads, } => { - while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {} + while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {} while let Poll::Ready(Some(_)) = tx_downloads.as_mut().poll_next(cx) {} result = Ok(()); Setup::Initialized { address_book, - downloads, + block_downloads, tx_downloads, } } @@ -358,8 +358,11 @@ impl Service for Inbound { async { Ok(zn::Response::Nil) }.boxed() } zn::Request::AdvertiseBlock(hash) => { - if let Setup::Initialized { downloads, .. } = &mut self.network_setup { - downloads.download_and_verify(hash); + if let Setup::Initialized { + block_downloads, .. + } = &mut self.network_setup + { + block_downloads.download_and_verify(hash); } else { info!( ?hash, From 1b01f3c1f65da3ec2bf1d135a73596284df2c38e Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Wed, 1 Sep 2021 14:08:41 -0300 Subject: [PATCH 12/13] Update zebra-consensus/src/transaction.rs Co-authored-by: teor --- zebra-consensus/src/transaction.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index ad88dca4b4e..10072b960d4 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -309,6 +309,13 @@ where ) -> Result<(), TransactionError> { match network_upgrade { // Supports V4 transactions + // + // Consensus rules: + // > [Sapling to Canopy inclusive, pre-NU5] The transaction version number MUST be 4, ... + // > + // > [NU5 onward] The transaction version number MUST be 4 or 5. + // + // https://zips.z.cash/protocol/protocol.pdf#txnconsensus NetworkUpgrade::Sapling | NetworkUpgrade::Blossom | NetworkUpgrade::Heartwood From 6969c3109ca46700b50d5cd0512ad0ba9c427100 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Wed, 1 Sep 2021 14:27:21 -0300 Subject: [PATCH 13/13] Add consensus doc; add more Block prefixes --- zebra-consensus/src/transaction.rs | 5 +++++ zebrad/src/components/inbound.rs | 8 ++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index 10072b960d4..c1456189cee 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -398,6 +398,11 @@ where ) -> Result<(), TransactionError> { match network_upgrade { // Supports V5 transactions + // + // Consensus rules: + // > [NU5 onward] The transaction version number MUST be 4 or 5. + // + // https://zips.z.cash/protocol/protocol.pdf#txnconsensus NetworkUpgrade::Nu5 => Ok(()), // Does not support V5 transactions diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 0d27d417a8c..271b5126b6a 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -28,7 +28,7 @@ use super::mempool::downloads::{ use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; mod downloads; -use downloads::Downloads; +use downloads::Downloads as BlockDownloads; type Outbound = Buffer, zn::Request>; type State = Buffer, zs::Request>; @@ -37,7 +37,7 @@ type TxVerifier = Buffer< BoxService, transaction::Request, >; -type InboundDownloads = Downloads, Timeout, State>; +type InboundBlockDownloads = BlockDownloads, Timeout, State>; type InboundTxDownloads = TxDownloads, Timeout, State>; pub type NetworkSetupData = (Outbound, Arc>); @@ -70,7 +70,7 @@ pub enum Setup { address_book: Arc>, /// A `futures::Stream` that downloads and verifies gossiped blocks. - block_downloads: Pin>, + block_downloads: Pin>, tx_downloads: Pin>, }, @@ -175,7 +175,7 @@ impl Service for Inbound { tx_verifier, } => match network_setup.try_recv() { Ok((outbound, address_book)) => { - let block_downloads = Box::pin(Downloads::new( + let block_downloads = Box::pin(BlockDownloads::new( Timeout::new(outbound.clone(), BLOCK_DOWNLOAD_TIMEOUT), Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT), self.state.clone(),