Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always stem local txs if configured that way (unless explicitly fluffed) #2876

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions api/src/handlers/pool_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ impl PoolPushHandler {
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into())
})
.and_then(move |tx: Transaction| {
let source = pool::TxSource {
debug_name: "push-api".to_string(),
identifier: "?.?.?.?".to_string(),
};
let source = pool::TxSource::PushApi;
info!(
"Pushing transaction {} to pool (inputs: {}, outputs: {}, kernels: {})",
tx.hash(),
Expand Down
8 changes: 8 additions & 0 deletions config/src/comments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ fn comments() -> HashMap<String, String> {
.to_string(),
);

retval.insert(
"always_stem_our_txs".to_string(),
"
#always stem our (pushed via api) txs regardless of stem/fluff epoch (as per Dandelion++ paper)
"
.to_string(),
);

retval.insert(
"[server.p2p_config]".to_string(),
"#test miner wallet URL (burns if this doesn't exist)
Expand Down
4 changes: 2 additions & 2 deletions pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ impl Pool {

fn log_pool_add(&self, entry: &PoolEntry, header: &BlockHeader) {
debug!(
"add_to_pool [{}]: {} ({}) [in/out/kern: {}/{}/{}] pool: {} (at block {})",
"add_to_pool [{}]: {} ({:?}) [in/out/kern: {}/{}/{}] pool: {} (at block {})",
self.name,
entry.tx.hash(),
entry.src.debug_name,
entry.src,
entry.tx.inputs().len(),
entry.tx.outputs().len(),
entry.tx.kernels().len(),
Expand Down
6 changes: 3 additions & 3 deletions pool/src/transaction_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl TransactionPool {
tx.validate(Weighting::AsTransaction, self.verifier_cache.clone())?;

entry.tx = tx;
entry.src.debug_name = "deagg".to_string();
entry.src = TxSource::Deaggregate;
}
}
self.txpool.add_to_pool(entry.clone(), vec![], header)?;
Expand Down Expand Up @@ -169,12 +169,12 @@ impl TransactionPool {
if !stem
|| self
.add_to_stempool(entry.clone(), header)
.and_then(|_| self.adapter.stem_tx_accepted(&entry.tx))
.and_then(|_| self.adapter.stem_tx_accepted(&entry))
.is_err()
{
self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry.clone());
self.adapter.tx_accepted(&entry.tx);
self.adapter.tx_accepted(&entry);
}

// Transaction passed all the checks but we have to make space for it
Expand Down
71 changes: 47 additions & 24 deletions pool/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,32 @@ const DANDELION_AGGREGATION_SECS: u16 = 30;
/// Dandelion stem probability (stem 90% of the time, fluff 10%).
const DANDELION_STEM_PROBABILITY: u8 = 90;

/// Always stem our (pushed via api) txs?
/// Defaults to true to match the Dandelion++ paper.
/// But can be overridden to allow a node to fluff our txs if desired.
/// If set to false we will stem/fluff our txs as per current epoch.
const DANDELION_ALWAYS_STEM_OUR_TXS: bool = true;

/// Configuration for "Dandelion".
/// Note: shared between p2p and pool.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct DandelionConfig {
/// Length of each "epoch".
#[serde(default = "default_dandelion_epoch_secs")]
pub epoch_secs: Option<u16>,
pub epoch_secs: u16,
/// Dandelion embargo timer. Fluff and broadcast individual txs if not seen
/// on network before embargo expires.
#[serde(default = "default_dandelion_embargo_secs")]
pub embargo_secs: Option<u16>,
pub embargo_secs: u16,
/// Dandelion aggregation timer.
#[serde(default = "default_dandelion_aggregation_secs")]
pub aggregation_secs: Option<u16>,
pub aggregation_secs: u16,
/// Dandelion stem probability (stem 90% of the time, fluff 10% etc.)
#[serde(default = "default_dandelion_stem_probability")]
pub stem_probability: Option<u8>,
pub stem_probability: u8,
/// Default to always stem our txs as described in Dandelion++ paper.
#[serde(default = "default_dandelion_always_stem_our_txs")]
pub always_stem_our_txs: bool,
}

impl Default for DandelionConfig {
Expand All @@ -65,24 +74,29 @@ impl Default for DandelionConfig {
embargo_secs: default_dandelion_embargo_secs(),
aggregation_secs: default_dandelion_aggregation_secs(),
stem_probability: default_dandelion_stem_probability(),
always_stem_our_txs: default_dandelion_always_stem_our_txs(),
}
}
}

fn default_dandelion_epoch_secs() -> Option<u16> {
Some(DANDELION_EPOCH_SECS)
fn default_dandelion_epoch_secs() -> u16 {
DANDELION_EPOCH_SECS
}

fn default_dandelion_embargo_secs() -> u16 {
DANDELION_EMBARGO_SECS
}

fn default_dandelion_embargo_secs() -> Option<u16> {
Some(DANDELION_EMBARGO_SECS)
fn default_dandelion_aggregation_secs() -> u16 {
DANDELION_AGGREGATION_SECS
}

fn default_dandelion_aggregation_secs() -> Option<u16> {
Some(DANDELION_AGGREGATION_SECS)
fn default_dandelion_stem_probability() -> u8 {
DANDELION_STEM_PROBABILITY
}

fn default_dandelion_stem_probability() -> Option<u8> {
Some(DANDELION_STEM_PROBABILITY)
fn default_dandelion_always_stem_our_txs() -> bool {
DANDELION_ALWAYS_STEM_OUR_TXS
}

/// Transaction pool configuration
Expand Down Expand Up @@ -145,20 +159,29 @@ pub struct PoolEntry {
pub tx: Transaction,
}

/// Placeholder: the data representing where we heard about a tx from.
///
/// Used to make decisions based on transaction acceptance priority from
/// various sources. For example, a node may want to bypass pool size
/// restrictions when accepting a transaction from a local wallet.
///
/// Most likely this will evolve to contain some sort of network identifier,
/// once we get a better sense of what transaction building might look like.
#[derive(Clone, Debug)]
pub struct TxSource {
/// Human-readable name used for logging and errors.
pub debug_name: String,
/// Unique identifier used to distinguish this peer from others.
pub identifier: String,
#[derive(Clone, Debug, PartialEq)]
pub enum TxSource {
PushApi,
Broadcast,
Fluff,
EmbargoExpired,
Deaggregate,
}

impl TxSource {
/// Convenience fn for checking if this tx was sourced via the push api.
pub fn is_pushed(&self) -> bool {
match self {
TxSource::PushApi => true,
_ => false,
}
}
}

/// Possible errors when interacting with the transaction pool.
Expand Down Expand Up @@ -250,19 +273,19 @@ pub trait BlockChain: Sync + Send {
/// importantly the broadcasting of transactions to our peers.
pub trait PoolAdapter: Send + Sync {
/// The transaction pool has accepted this transaction as valid.
fn tx_accepted(&self, tx: &transaction::Transaction);
fn tx_accepted(&self, entry: &PoolEntry);

/// The stem transaction pool has accepted this transactions as valid.
fn stem_tx_accepted(&self, tx: &transaction::Transaction) -> Result<(), PoolError>;
fn stem_tx_accepted(&self, entry: &PoolEntry) -> Result<(), PoolError>;
}

/// Dummy adapter used as a placeholder for real implementations
#[allow(dead_code)]
pub struct NoopAdapter {}

impl PoolAdapter for NoopAdapter {
fn tx_accepted(&self, _tx: &transaction::Transaction) {}
fn stem_tx_accepted(&self, _tx: &transaction::Transaction) -> Result<(), PoolError> {
fn tx_accepted(&self, _entry: &PoolEntry) {}
fn stem_tx_accepted(&self, _entry: &PoolEntry) -> Result<(), PoolError> {
Ok(())
}
}
5 changes: 1 addition & 4 deletions pool/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,7 @@ where
}

pub fn test_source() -> TxSource {
TxSource {
debug_name: format!("test"),
identifier: format!("127.0.0.1"),
}
TxSource::Broadcast
}

pub fn clean_output_dir(db_root: String) {
Expand Down
6 changes: 4 additions & 2 deletions pool/tests/transaction_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ use self::core::core::{transaction, Block, BlockHeader, Weighting};
use self::core::libtx;
use self::core::pow::Difficulty;
use self::keychain::{ExtKeychain, Keychain};
use self::pool::TxSource;
use self::util::RwLock;
use crate::common::*;
use grin_core as core;
use grin_keychain as keychain;
use grin_pool as pool;
use grin_util as util;
use std::sync::Arc;

Expand Down Expand Up @@ -237,7 +239,7 @@ fn test_the_transaction_pool() {
assert_eq!(write_pool.total_size(), 6);
let entry = write_pool.txpool.entries.last().unwrap();
assert_eq!(entry.tx.kernels().len(), 1);
assert_eq!(entry.src.debug_name, "deagg");
assert_eq!(entry.src, TxSource::Deaggregate);
}

// Check we cannot "double spend" an output spent in a previous block.
Expand Down Expand Up @@ -447,7 +449,7 @@ fn test_the_transaction_pool() {
assert_eq!(write_pool.total_size(), 6);
let entry = write_pool.txpool.entries.last().unwrap();
assert_eq!(entry.tx.kernels().len(), 1);
assert_eq!(entry.src.debug_name, "deagg");
assert_eq!(entry.src, TxSource::Deaggregate);
}

// Check we cannot "double spend" an output spent in a previous block.
Expand Down
19 changes: 8 additions & 11 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::core::{core, global};
use crate::p2p;
use crate::p2p::types::PeerInfo;
use crate::pool;
use crate::pool::types::DandelionConfig;
use crate::util::OneTime;
use chrono::prelude::*;
use chrono::Duration;
Expand Down Expand Up @@ -97,10 +96,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
return Ok(true);
}

let source = pool::TxSource {
debug_name: "p2p".to_string(),
identifier: "?.?.?.?".to_string(),
};
let source = pool::TxSource::Broadcast;

let header = self.chain().head_header()?;

Expand Down Expand Up @@ -804,21 +800,22 @@ impl DandelionAdapter for PoolToNetAdapter {
}

impl pool::PoolAdapter for PoolToNetAdapter {
fn tx_accepted(&self, tx: &core::Transaction) {
self.peers().broadcast_transaction(tx);
fn tx_accepted(&self, entry: &pool::PoolEntry) {
self.peers().broadcast_transaction(&entry.tx);
}

fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> {
fn stem_tx_accepted(&self, entry: &pool::PoolEntry) -> Result<(), pool::PoolError> {
// Take write lock on the current epoch.
// We need to be able to update the current relay peer if not currently connected.
let mut epoch = self.dandelion_epoch.write();

// If "stem" epoch attempt to relay the tx to the next Dandelion relay.
// Fallback to immediately fluffing the tx if we cannot stem for any reason.
// If "fluff" epoch then nothing to do right now (fluff via Dandelion monitor).
if epoch.is_stem() {
// If node is configured to always stem our (pushed via api) txs then do so.
if epoch.is_stem() || (entry.src.is_pushed() && epoch.always_stem_our_txs()) {
if let Some(peer) = epoch.relay_peer(&self.peers()) {
match peer.send_stem_transaction(tx) {
match peer.send_stem_transaction(&entry.tx) {
Ok(_) => {
info!("Stemming this epoch, relaying to next peer.");
Ok(())
Expand All @@ -841,7 +838,7 @@ impl pool::PoolAdapter for PoolToNetAdapter {

impl PoolToNetAdapter {
/// Create a new pool to net adapter
pub fn new(config: DandelionConfig) -> PoolToNetAdapter {
pub fn new(config: pool::DandelionConfig) -> PoolToNetAdapter {
PoolToNetAdapter {
peers: OneTime::new(),
dandelion_epoch: Arc::new(RwLock::new(DandelionEpoch::new(config))),
Expand Down
14 changes: 8 additions & 6 deletions servers/src/common/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,8 @@ impl DandelionEpoch {
match self.start_time {
None => true,
Some(start_time) => {
let epoch_secs = self.config.epoch_secs.expect("epoch_secs config missing") as i64;
Utc::now().timestamp().saturating_sub(start_time) > epoch_secs
let epoch_secs = self.config.epoch_secs;
Utc::now().timestamp().saturating_sub(start_time) > epoch_secs as i64
}
}
}
Expand All @@ -511,10 +511,7 @@ impl DandelionEpoch {

// If stem_probability == 90 then we stem 90% of the time.
let mut rng = rand::thread_rng();
let stem_probability = self
.config
.stem_probability
.expect("stem_probability config missing");
let stem_probability = self.config.stem_probability;
self.is_stem = rng.gen_range(0, 100) < stem_probability;

let addr = self.relay_peer.clone().map(|p| p.info.addr);
Expand All @@ -529,6 +526,11 @@ impl DandelionEpoch {
self.is_stem
}

/// Always stem our (pushed via api) txs regardless of stem/fluff epoch?
pub fn always_stem_our_txs(&self) -> bool {
self.config.always_stem_our_txs
}

/// What is our current relay peer?
/// If it is not connected then choose a new one.
pub fn relay_peer(&mut self, peers: &Arc<p2p::Peers>) -> Option<Arc<p2p::Peer>> {
Expand Down
23 changes: 4 additions & 19 deletions servers/src/grin/dandelion_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ fn process_fluff_phase(
return Ok(());
}

let cutoff_secs = dandelion_config
.aggregation_secs
.expect("aggregation secs config missing");
let cutoff_secs = dandelion_config.aggregation_secs;
let cutoff_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs);

// If epoch is expired, fluff *all* outstanding entries in stempool.
Expand Down Expand Up @@ -149,12 +147,7 @@ fn process_fluff_phase(
verifier_cache.clone(),
)?;

let src = TxSource {
debug_name: "fluff".to_string(),
identifier: "?.?.?.?".to_string(),
};

tx_pool.add_to_pool(src, agg_tx, false, &header)?;
tx_pool.add_to_pool(TxSource::Fluff, agg_tx, false, &header)?;
Ok(())
}

Expand All @@ -165,10 +158,7 @@ fn process_expired_entries(
// Take a write lock on the txpool for the duration of this processing.
let mut tx_pool = tx_pool.write();

let embargo_secs = dandelion_config
.embargo_secs
.expect("embargo_secs config missing")
+ thread_rng().gen_range(0, 31);
let embargo_secs = dandelion_config.embargo_secs + thread_rng().gen_range(0, 31);
let expired_entries = select_txs_cutoff(&tx_pool.stempool, embargo_secs);

if expired_entries.is_empty() {
Expand All @@ -179,14 +169,9 @@ fn process_expired_entries(

let header = tx_pool.chain_head()?;

let src = TxSource {
debug_name: "embargo_expired".to_string(),
identifier: "?.?.?.?".to_string(),
};

for entry in expired_entries {
let txhash = entry.tx.hash();
match tx_pool.add_to_pool(src.clone(), entry.tx, false, &header) {
match tx_pool.add_to_pool(TxSource::EmbargoExpired, entry.tx, false, &header) {
Ok(_) => info!(
"dand_mon: embargo expired for {}, fluffed successfully.",
txhash
Expand Down