diff --git a/api/src/handlers/pool_api.rs b/api/src/handlers/pool_api.rs index b4e49bd1ef..1b07333605 100644 --- a/api/src/handlers/pool_api.rs +++ b/api/src/handlers/pool_api.rs @@ -82,10 +82,7 @@ impl PoolPushHandler { .map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into()) }) .and_then(move |tx: Transaction| { - let source = pool::TxSource { - debug_name: "push-api".to_string(), - identifier: "?.?.?.?".to_string(), - }; + let source = pool::TxSource::PushApi; info!( "Pushing transaction {} to pool (inputs: {}, outputs: {}, kernels: {})", tx.hash(), diff --git a/config/src/comments.rs b/config/src/comments.rs index 4ed227896c..1262b12e93 100644 --- a/config/src/comments.rs +++ b/config/src/comments.rs @@ -210,6 +210,14 @@ fn comments() -> HashMap { .to_string(), ); + retval.insert( + "always_stem_our_txs".to_string(), + " +#always stem our (pushed via api) txs regardless of stem/fluff epoch (as per Dandelion++ paper) +" + .to_string(), + ); + retval.insert( "[server.p2p_config]".to_string(), "#test miner wallet URL (burns if this doesn't exist) diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 3f7052861c..d382182184 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -202,10 +202,10 @@ impl Pool { fn log_pool_add(&self, entry: &PoolEntry, header: &BlockHeader) { debug!( - "add_to_pool [{}]: {} ({}) [in/out/kern: {}/{}/{}] pool: {} (at block {})", + "add_to_pool [{}]: {} ({:?}) [in/out/kern: {}/{}/{}] pool: {} (at block {})", self.name, entry.tx.hash(), - entry.src.debug_name, + entry.src, entry.tx.inputs().len(), entry.tx.outputs().len(), entry.tx.kernels().len(), diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index 5471775a6b..ee58d187b0 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -108,7 +108,7 @@ impl TransactionPool { tx.validate(Weighting::AsTransaction, self.verifier_cache.clone())?; entry.tx = tx; - entry.src.debug_name = "deagg".to_string(); + entry.src = TxSource::Deaggregate; } } self.txpool.add_to_pool(entry.clone(), vec![], header)?; @@ -169,12 +169,12 @@ impl TransactionPool { if !stem || self .add_to_stempool(entry.clone(), header) - .and_then(|_| self.adapter.stem_tx_accepted(&entry.tx)) + .and_then(|_| self.adapter.stem_tx_accepted(&entry)) .is_err() { self.add_to_txpool(entry.clone(), header)?; self.add_to_reorg_cache(entry.clone()); - self.adapter.tx_accepted(&entry.tx); + self.adapter.tx_accepted(&entry); } // Transaction passed all the checks but we have to make space for it diff --git a/pool/src/types.rs b/pool/src/types.rs index 3faee7e659..0bc3ed8dc4 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -39,23 +39,32 @@ const DANDELION_AGGREGATION_SECS: u16 = 30; /// Dandelion stem probability (stem 90% of the time, fluff 10%). const DANDELION_STEM_PROBABILITY: u8 = 90; +/// Always stem our (pushed via api) txs? +/// Defaults to true to match the Dandelion++ paper. +/// But can be overridden to allow a node to fluff our txs if desired. +/// If set to false we will stem/fluff our txs as per current epoch. +const DANDELION_ALWAYS_STEM_OUR_TXS: bool = true; + /// Configuration for "Dandelion". /// Note: shared between p2p and pool. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct DandelionConfig { /// Length of each "epoch". #[serde(default = "default_dandelion_epoch_secs")] - pub epoch_secs: Option, + pub epoch_secs: u16, /// Dandelion embargo timer. Fluff and broadcast individual txs if not seen /// on network before embargo expires. #[serde(default = "default_dandelion_embargo_secs")] - pub embargo_secs: Option, + pub embargo_secs: u16, /// Dandelion aggregation timer. #[serde(default = "default_dandelion_aggregation_secs")] - pub aggregation_secs: Option, + pub aggregation_secs: u16, /// Dandelion stem probability (stem 90% of the time, fluff 10% etc.) #[serde(default = "default_dandelion_stem_probability")] - pub stem_probability: Option, + pub stem_probability: u8, + /// Default to always stem our txs as described in Dandelion++ paper. + #[serde(default = "default_dandelion_always_stem_our_txs")] + pub always_stem_our_txs: bool, } impl Default for DandelionConfig { @@ -65,24 +74,29 @@ impl Default for DandelionConfig { embargo_secs: default_dandelion_embargo_secs(), aggregation_secs: default_dandelion_aggregation_secs(), stem_probability: default_dandelion_stem_probability(), + always_stem_our_txs: default_dandelion_always_stem_our_txs(), } } } -fn default_dandelion_epoch_secs() -> Option { - Some(DANDELION_EPOCH_SECS) +fn default_dandelion_epoch_secs() -> u16 { + DANDELION_EPOCH_SECS +} + +fn default_dandelion_embargo_secs() -> u16 { + DANDELION_EMBARGO_SECS } -fn default_dandelion_embargo_secs() -> Option { - Some(DANDELION_EMBARGO_SECS) +fn default_dandelion_aggregation_secs() -> u16 { + DANDELION_AGGREGATION_SECS } -fn default_dandelion_aggregation_secs() -> Option { - Some(DANDELION_AGGREGATION_SECS) +fn default_dandelion_stem_probability() -> u8 { + DANDELION_STEM_PROBABILITY } -fn default_dandelion_stem_probability() -> Option { - Some(DANDELION_STEM_PROBABILITY) +fn default_dandelion_always_stem_our_txs() -> bool { + DANDELION_ALWAYS_STEM_OUR_TXS } /// Transaction pool configuration @@ -145,20 +159,29 @@ pub struct PoolEntry { pub tx: Transaction, } -/// Placeholder: the data representing where we heard about a tx from. -/// /// Used to make decisions based on transaction acceptance priority from /// various sources. For example, a node may want to bypass pool size /// restrictions when accepting a transaction from a local wallet. /// /// Most likely this will evolve to contain some sort of network identifier, /// once we get a better sense of what transaction building might look like. -#[derive(Clone, Debug)] -pub struct TxSource { - /// Human-readable name used for logging and errors. - pub debug_name: String, - /// Unique identifier used to distinguish this peer from others. - pub identifier: String, +#[derive(Clone, Debug, PartialEq)] +pub enum TxSource { + PushApi, + Broadcast, + Fluff, + EmbargoExpired, + Deaggregate, +} + +impl TxSource { + /// Convenience fn for checking if this tx was sourced via the push api. + pub fn is_pushed(&self) -> bool { + match self { + TxSource::PushApi => true, + _ => false, + } + } } /// Possible errors when interacting with the transaction pool. @@ -250,10 +273,10 @@ pub trait BlockChain: Sync + Send { /// importantly the broadcasting of transactions to our peers. pub trait PoolAdapter: Send + Sync { /// The transaction pool has accepted this transaction as valid. - fn tx_accepted(&self, tx: &transaction::Transaction); + fn tx_accepted(&self, entry: &PoolEntry); /// The stem transaction pool has accepted this transactions as valid. - fn stem_tx_accepted(&self, tx: &transaction::Transaction) -> Result<(), PoolError>; + fn stem_tx_accepted(&self, entry: &PoolEntry) -> Result<(), PoolError>; } /// Dummy adapter used as a placeholder for real implementations @@ -261,8 +284,8 @@ pub trait PoolAdapter: Send + Sync { pub struct NoopAdapter {} impl PoolAdapter for NoopAdapter { - fn tx_accepted(&self, _tx: &transaction::Transaction) {} - fn stem_tx_accepted(&self, _tx: &transaction::Transaction) -> Result<(), PoolError> { + fn tx_accepted(&self, _entry: &PoolEntry) {} + fn stem_tx_accepted(&self, _entry: &PoolEntry) -> Result<(), PoolError> { Ok(()) } } diff --git a/pool/tests/common.rs b/pool/tests/common.rs index 3ddb256bfa..e02d9764e3 100644 --- a/pool/tests/common.rs +++ b/pool/tests/common.rs @@ -229,10 +229,7 @@ where } pub fn test_source() -> TxSource { - TxSource { - debug_name: format!("test"), - identifier: format!("127.0.0.1"), - } + TxSource::Broadcast } pub fn clean_output_dir(db_root: String) { diff --git a/pool/tests/transaction_pool.rs b/pool/tests/transaction_pool.rs index eaf630f5c1..f5a73deee3 100644 --- a/pool/tests/transaction_pool.rs +++ b/pool/tests/transaction_pool.rs @@ -19,10 +19,12 @@ use self::core::core::{transaction, Block, BlockHeader, Weighting}; use self::core::libtx; use self::core::pow::Difficulty; use self::keychain::{ExtKeychain, Keychain}; +use self::pool::TxSource; use self::util::RwLock; use crate::common::*; use grin_core as core; use grin_keychain as keychain; +use grin_pool as pool; use grin_util as util; use std::sync::Arc; @@ -237,7 +239,7 @@ fn test_the_transaction_pool() { assert_eq!(write_pool.total_size(), 6); let entry = write_pool.txpool.entries.last().unwrap(); assert_eq!(entry.tx.kernels().len(), 1); - assert_eq!(entry.src.debug_name, "deagg"); + assert_eq!(entry.src, TxSource::Deaggregate); } // Check we cannot "double spend" an output spent in a previous block. @@ -447,7 +449,7 @@ fn test_the_transaction_pool() { assert_eq!(write_pool.total_size(), 6); let entry = write_pool.txpool.entries.last().unwrap(); assert_eq!(entry.tx.kernels().len(), 1); - assert_eq!(entry.src.debug_name, "deagg"); + assert_eq!(entry.src, TxSource::Deaggregate); } // Check we cannot "double spend" an output spent in a previous block. diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 91f20975a7..bdc735ca22 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -37,7 +37,6 @@ use crate::core::{core, global}; use crate::p2p; use crate::p2p::types::PeerInfo; use crate::pool; -use crate::pool::types::DandelionConfig; use crate::util::OneTime; use chrono::prelude::*; use chrono::Duration; @@ -97,10 +96,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { return Ok(true); } - let source = pool::TxSource { - debug_name: "p2p".to_string(), - identifier: "?.?.?.?".to_string(), - }; + let source = pool::TxSource::Broadcast; let header = self.chain().head_header()?; @@ -804,11 +800,11 @@ impl DandelionAdapter for PoolToNetAdapter { } impl pool::PoolAdapter for PoolToNetAdapter { - fn tx_accepted(&self, tx: &core::Transaction) { - self.peers().broadcast_transaction(tx); + fn tx_accepted(&self, entry: &pool::PoolEntry) { + self.peers().broadcast_transaction(&entry.tx); } - fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> { + fn stem_tx_accepted(&self, entry: &pool::PoolEntry) -> Result<(), pool::PoolError> { // Take write lock on the current epoch. // We need to be able to update the current relay peer if not currently connected. let mut epoch = self.dandelion_epoch.write(); @@ -816,9 +812,10 @@ impl pool::PoolAdapter for PoolToNetAdapter { // If "stem" epoch attempt to relay the tx to the next Dandelion relay. // Fallback to immediately fluffing the tx if we cannot stem for any reason. // If "fluff" epoch then nothing to do right now (fluff via Dandelion monitor). - if epoch.is_stem() { + // If node is configured to always stem our (pushed via api) txs then do so. + if epoch.is_stem() || (entry.src.is_pushed() && epoch.always_stem_our_txs()) { if let Some(peer) = epoch.relay_peer(&self.peers()) { - match peer.send_stem_transaction(tx) { + match peer.send_stem_transaction(&entry.tx) { Ok(_) => { info!("Stemming this epoch, relaying to next peer."); Ok(()) @@ -841,7 +838,7 @@ impl pool::PoolAdapter for PoolToNetAdapter { impl PoolToNetAdapter { /// Create a new pool to net adapter - pub fn new(config: DandelionConfig) -> PoolToNetAdapter { + pub fn new(config: pool::DandelionConfig) -> PoolToNetAdapter { PoolToNetAdapter { peers: OneTime::new(), dandelion_epoch: Arc::new(RwLock::new(DandelionEpoch::new(config))), diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index 9b119446ca..3a688ee159 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -496,8 +496,8 @@ impl DandelionEpoch { match self.start_time { None => true, Some(start_time) => { - let epoch_secs = self.config.epoch_secs.expect("epoch_secs config missing") as i64; - Utc::now().timestamp().saturating_sub(start_time) > epoch_secs + let epoch_secs = self.config.epoch_secs; + Utc::now().timestamp().saturating_sub(start_time) > epoch_secs as i64 } } } @@ -511,10 +511,7 @@ impl DandelionEpoch { // If stem_probability == 90 then we stem 90% of the time. let mut rng = rand::thread_rng(); - let stem_probability = self - .config - .stem_probability - .expect("stem_probability config missing"); + let stem_probability = self.config.stem_probability; self.is_stem = rng.gen_range(0, 100) < stem_probability; let addr = self.relay_peer.clone().map(|p| p.info.addr); @@ -529,6 +526,11 @@ impl DandelionEpoch { self.is_stem } + /// Always stem our (pushed via api) txs regardless of stem/fluff epoch? + pub fn always_stem_our_txs(&self) -> bool { + self.config.always_stem_our_txs + } + /// What is our current relay peer? /// If it is not connected then choose a new one. pub fn relay_peer(&mut self, peers: &Arc) -> Option> { diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 685d36c9d6..faed0cce1c 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -113,9 +113,7 @@ fn process_fluff_phase( return Ok(()); } - let cutoff_secs = dandelion_config - .aggregation_secs - .expect("aggregation secs config missing"); + let cutoff_secs = dandelion_config.aggregation_secs; let cutoff_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs); // If epoch is expired, fluff *all* outstanding entries in stempool. @@ -149,12 +147,7 @@ fn process_fluff_phase( verifier_cache.clone(), )?; - let src = TxSource { - debug_name: "fluff".to_string(), - identifier: "?.?.?.?".to_string(), - }; - - tx_pool.add_to_pool(src, agg_tx, false, &header)?; + tx_pool.add_to_pool(TxSource::Fluff, agg_tx, false, &header)?; Ok(()) } @@ -165,10 +158,7 @@ fn process_expired_entries( // Take a write lock on the txpool for the duration of this processing. let mut tx_pool = tx_pool.write(); - let embargo_secs = dandelion_config - .embargo_secs - .expect("embargo_secs config missing") - + thread_rng().gen_range(0, 31); + let embargo_secs = dandelion_config.embargo_secs + thread_rng().gen_range(0, 31); let expired_entries = select_txs_cutoff(&tx_pool.stempool, embargo_secs); if expired_entries.is_empty() { @@ -179,14 +169,9 @@ fn process_expired_entries( let header = tx_pool.chain_head()?; - let src = TxSource { - debug_name: "embargo_expired".to_string(), - identifier: "?.?.?.?".to_string(), - }; - for entry in expired_entries { let txhash = entry.tx.hash(); - match tx_pool.add_to_pool(src.clone(), entry.tx, false, &header) { + match tx_pool.add_to_pool(TxSource::EmbargoExpired, entry.tx, false, &header) { Ok(_) => info!( "dand_mon: embargo expired for {}, fluffed successfully.", txhash