From ceacd3d22add88df19daca0bd6108d35462ebc0d Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 14 May 2018 13:21:12 +0200 Subject: [PATCH 1/6] call `on_new_transactions` when we import --- Cargo.lock | 3 +++ polkadot/cli/Cargo.toml | 3 +++ polkadot/cli/src/lib.rs | 38 +++++++++++++++++++++++++++- polkadot/transaction-pool/src/lib.rs | 14 ---------- substrate/rpc/src/author/mod.rs | 12 --------- 5 files changed, 43 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84210a9849164..733430322e59f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1297,16 +1297,19 @@ dependencies = [ "hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-executor 0.1.0", "polkadot-primitives 0.1.0", "polkadot-runtime 0.1.0", "polkadot-service 0.1.0", + "polkadot-transaction-pool 0.1.0", "regex 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-client 0.1.0", "substrate-codec 0.1.0", "substrate-executor 0.1.0", "substrate-network 0.1.0", "substrate-primitives 0.1.0", + "substrate-rpc 0.1.0", "substrate-rpc-servers 0.1.0", "substrate-runtime-support 0.1.0", "substrate-state-machine 0.1.0", diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml index d956a0290b2f9..5ec179b273493 100644 --- a/polkadot/cli/Cargo.toml +++ b/polkadot/cli/Cargo.toml @@ -22,6 +22,7 @@ tokio-core = "0.1.12" futures = "0.1.17" ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" } fdlimit = "0.1" +parking_lot = "0.4" substrate-client = { path = "../../substrate/client" } substrate-network = { path = "../../substrate/network" } substrate-codec = { path = "../../substrate/codec" } @@ -29,8 +30,10 @@ substrate-runtime-support = { path = "../../substrate/runtime-support" } substrate-state-machine = { path = "../../substrate/state-machine" } substrate-executor = { path = "../../substrate/executor" } substrate-primitives = { path = "../../substrate/primitives" } +substrate-rpc = { path = "../../substrate/rpc" } substrate-rpc-servers = { path = "../../substrate/rpc-servers" } polkadot-primitives = { path = "../primitives" } polkadot-executor = { path = "../executor" } polkadot-runtime = { path = "../runtime" } polkadot-service = { path = "../service" } +polkadot-transaction-pool = { path = "../transaction-pool" } diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index c469b7b504cd1..02ab93fa1934a 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -30,17 +30,21 @@ extern crate ctrlc; extern crate fdlimit; extern crate ed25519; extern crate triehash; +extern crate parking_lot; + extern crate substrate_codec as codec; extern crate substrate_state_machine as state_machine; extern crate substrate_client as client; extern crate substrate_primitives as primitives; extern crate substrate_network as network; +extern crate substrate_rpc; extern crate substrate_rpc_servers as rpc; extern crate substrate_runtime_support as runtime_support; extern crate polkadot_primitives; extern crate polkadot_executor; extern crate polkadot_runtime; extern crate polkadot_service as service; +extern crate polkadot_transaction_pool as txpool; #[macro_use] extern crate lazy_static; @@ -57,10 +61,38 @@ mod informant; use std::io; use std::net::SocketAddr; use std::path::{Path, PathBuf}; +use std::sync::Arc; + use futures::sync::mpsc; use futures::{Sink, Future, Stream}; use tokio_core::reactor; +use parking_lot::Mutex; use service::ChainSpec; +use primitives::block::Extrinsic; + +struct RpcTransactionPool { + inner: Arc>, + network: Arc, +} + +impl substrate_rpc::author::AuthorApi for RpcTransactionPool { + fn submit_extrinsic(&self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> { + use primitives::hexdisplay::HexDisplay; + use polkadot_runtime::UncheckedExtrinsic; + use codec::Slicable; + + info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0)); + let decoded = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s)) + .ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?; + info!("Correctly formatted: {:?}", decoded); + let verified = self.inner.lock().import(decoded) + .map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError)?; + + let hash = verified.hash(); + self.network.on_new_transactions(&[(hash.0.into(), xt.0)]); + Ok(()) + } +} /// Parse command line arguments and start the node. /// @@ -172,7 +204,11 @@ pub fn run(args: I) -> error::Result<()> where let handler = || { let chain = rpc::apis::chain::Chain::new(service.client(), core.remote()); - rpc::rpc_handler(service.client(), chain, service.transaction_pool()) + let pool = RpcTransactionPool { + inner: service.transaction_pool(), + network: service.network(), + }; + rpc::rpc_handler(service.client(), chain, pool) }; ( start_server(http_address, |address| rpc::start_http(address, handler())), diff --git a/polkadot/transaction-pool/src/lib.rs b/polkadot/transaction-pool/src/lib.rs index 555f2a8402b23..aa7706c84bb66 100644 --- a/polkadot/transaction-pool/src/lib.rs +++ b/polkadot/transaction-pool/src/lib.rs @@ -17,7 +17,6 @@ extern crate ed25519; extern crate ethereum_types; extern crate substrate_codec as codec; -extern crate substrate_rpc; extern crate substrate_primitives as substrate_primitives; extern crate substrate_runtime_primitives as substrate_runtime_primitives; extern crate polkadot_runtime as runtime; @@ -380,19 +379,6 @@ impl TransactionPool { } } -impl substrate_rpc::author::AsyncAuthorApi for TransactionPool { - fn submit_extrinsic(&mut self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> { - use substrate_primitives::hexdisplay::HexDisplay; - info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0)); - let xt = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s)) - .ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?; - info!("Correctly formatted: {:?}", xt); - self.import(xt) - .map(|_| ()) - .map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError.into()) - } -} - #[cfg(test)] mod tests { } diff --git a/substrate/rpc/src/author/mod.rs b/substrate/rpc/src/author/mod.rs index 61683e77b18b3..c9ede2ac98fac 100644 --- a/substrate/rpc/src/author/mod.rs +++ b/substrate/rpc/src/author/mod.rs @@ -35,15 +35,3 @@ build_rpc_trait! { fn submit_extrinsic(&self, Extrinsic) -> Result<()>; } } - -/// Variant of the AuthorApi that doesn't need to be Sync + Send + 'static. -pub trait AsyncAuthorApi: Send + 'static { - /// Submit extrinsic for inclusion in block. - fn submit_extrinsic(&mut self, Extrinsic) -> Result<()>; -} - -impl AuthorApi for Arc> { - fn submit_extrinsic(&self, xt: Extrinsic) -> Result<()> { - self.as_ref().lock().submit_extrinsic(xt) - } -} From cb7013ea2a1cb7abd9af0bb5c914e7254c99cac5 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 14 May 2018 13:41:24 +0200 Subject: [PATCH 2/6] fix trace --- substrate/network/src/protocol.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index 5a66b21f0ed16..62b33f664d312 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -434,7 +434,7 @@ impl Protocol { trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id); return; } - trace!(target: "sync", "Received {} transactions from {}", peer_id, transactions.len()); + trace!(target: "sync", "Received {} transactions from {}", transactions.len(), peer_id); let mut peers = self.peers.write(); if let Some(ref mut peer) = peers.get_mut(&peer_id) { for t in transactions { From b5a4fed20d2f80334742ee12012c5df02d9bcbe2 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 14 May 2018 14:44:09 +0200 Subject: [PATCH 3/6] pass correct bytes to network --- polkadot/cli/src/lib.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 02ab93fa1934a..97c53cefeb11d 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -82,14 +82,17 @@ impl substrate_rpc::author::AuthorApi for RpcTransactionPool { use codec::Slicable; info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0)); - let decoded = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s)) + let (decoded, encoded_bytes) = xt.using_encoded(|s| { + UncheckedExtrinsic::decode(&mut &s[..]).map(|d| (d, s.to_vec())) + }) .ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?; info!("Correctly formatted: {:?}", decoded); - let verified = self.inner.lock().import(decoded) + + let hash = self.inner.lock().import(decoded) + .map(|v| v.hash().0.into()) .map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError)?; - let hash = verified.hash(); - self.network.on_new_transactions(&[(hash.0.into(), xt.0)]); + self.network.on_new_transactions(&[(hash, encoded_bytes)]); Ok(()) } } From 69f0bb35eec5db810ec6c09d2491a958e951aab6 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 14 May 2018 15:11:59 +0200 Subject: [PATCH 4/6] clean up --- polkadot/transaction-pool/src/lib.rs | 2 -- substrate/rpc/src/author/mod.rs | 2 -- substrate/rpc/src/author/tests.rs | 14 +++++++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/polkadot/transaction-pool/src/lib.rs b/polkadot/transaction-pool/src/lib.rs index aa7706c84bb66..5e88a56be8688 100644 --- a/polkadot/transaction-pool/src/lib.rs +++ b/polkadot/transaction-pool/src/lib.rs @@ -34,10 +34,8 @@ use std::collections::HashMap; use std::cmp::Ordering; use std::sync::Arc; -use codec::Slicable; use polkadot_api::PolkadotApi; use primitives::{AccountId, Timestamp}; -use substrate_primitives::block::Extrinsic; use runtime::{Block, UncheckedExtrinsic, TimestampCall, Call}; use substrate_runtime_primitives::traits::Checkable; use transaction_pool::{Pool, Readiness}; diff --git a/substrate/rpc/src/author/mod.rs b/substrate/rpc/src/author/mod.rs index c9ede2ac98fac..e10234282fda5 100644 --- a/substrate/rpc/src/author/mod.rs +++ b/substrate/rpc/src/author/mod.rs @@ -16,8 +16,6 @@ //! Substrate block-author/full-node API. -use std::sync::Arc; -use parking_lot::Mutex; use primitives::block::Extrinsic; pub mod error; diff --git a/substrate/rpc/src/author/tests.rs b/substrate/rpc/src/author/tests.rs index 5306664fe11e4..8a2c3eb2e3f79 100644 --- a/substrate/rpc/src/author/tests.rs +++ b/substrate/rpc/src/author/tests.rs @@ -14,20 +14,24 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use primitives::block; use super::*; use super::error::*; +use std::sync::Arc; +use parking_lot::Mutex; +use primitives::block; + #[derive(Default)] struct DummyTxPool { submitted: Vec, } -impl AsyncAuthorApi for DummyTxPool { +impl AuthorApi for Arc> { /// Submit extrinsic for inclusion in block. - fn submit_extrinsic(&mut self, xt: Extrinsic) -> Result<()> { - if self.submitted.len() < 1 { - self.submitted.push(xt); + fn submit_extrinsic(&self, xt: Extrinsic) -> Result<()> { + let mut s = self.lock(); + if s.submitted.len() < 1 { + s.submitted.push(xt); Ok(()) } else { Err(ErrorKind::PoolError.into()) From c3571df39fabec2d664ea6475855f9b346149306 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 14 May 2018 16:17:29 +0200 Subject: [PATCH 5/6] cull before repropagating; repropagate on timer --- polkadot/cli/src/lib.rs | 10 ++++------ substrate/network/src/protocol.rs | 5 ++++- substrate/network/src/service.rs | 25 ++++++++++++++++++++----- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 97c53cefeb11d..604c665910230 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -82,17 +82,15 @@ impl substrate_rpc::author::AuthorApi for RpcTransactionPool { use codec::Slicable; info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0)); - let (decoded, encoded_bytes) = xt.using_encoded(|s| { - UncheckedExtrinsic::decode(&mut &s[..]).map(|d| (d, s.to_vec())) - }) + let decoded = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s)) .ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?; + info!("Correctly formatted: {:?}", decoded); - let hash = self.inner.lock().import(decoded) - .map(|v| v.hash().0.into()) + self.inner.lock().import(decoded) .map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError)?; - self.network.on_new_transactions(&[(hash, encoded_bytes)]); + self.network.trigger_repropagate(); Ok(()) } } diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index 62b33f664d312..af6bcb10dbee3 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -446,11 +446,14 @@ impl Protocol { } /// Called when peer sends us new transactions - pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(ExtrinsicHash, Vec)]) { + pub fn propagate_transactions(&self, io: &mut SyncIo) { // Accept transactions only when fully synced if self.sync.read().status().state != SyncState::Idle { return; } + + let transactions = self.transaction_pool.transactions(); + let mut peers = self.peers.write(); for (peer_id, ref mut peer) in peers.iter_mut() { let to_send: Vec<_> = transactions.iter().filter_map(|&(hash, ref t)| diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index 29c70b70a4018..377cbf8a92f4b 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::collections::{BTreeMap}; use std::io; +use std::time::Duration; use futures::sync::{oneshot, mpsc}; use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId, NetworkConfiguration , NonReservedPeerMode, ErrorKind}; @@ -41,6 +42,12 @@ pub type StatementStream = mpsc::UnboundedReceiver; /// Type that represents bft messages stream. pub type BftMessageStream = mpsc::UnboundedReceiver; +const TICK_TOKEN: TimerToken = 0; +const TICK_TIMEOUT: Duration = Duration::from_millis(1000); + +const PROPAGATE_TOKEN: TimerToken = 1; +const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000); + bitflags! { /// Node roles bitmask. pub struct Role: u32 { @@ -162,9 +169,9 @@ impl Service { } /// Called when new transactons are imported by the client. - pub fn on_new_transactions(&self, transactions: &[(ExtrinsicHash, Vec)]) { + pub fn trigger_repropagate(&self) { self.network.with_context(DOT_PROTOCOL_ID, |context| { - self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions); + self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context)); }); } @@ -268,7 +275,11 @@ impl ConsensusService for Service { impl NetworkProtocolHandler for ProtocolHandler { fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { - io.register_timer(0, ::std::time::Duration::from_millis(1000)).expect("Error registering sync timer"); + io.register_timer(TICK_TOKEN, TICK_TIMEOUT) + .expect("Error registering sync timer"); + + io.register_timer(PROPAGATE_TOKEN, PROPAGATE_TIMEOUT) + .expect("Error registering transaction propagation timer"); } fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) { @@ -283,8 +294,12 @@ impl NetworkProtocolHandler for ProtocolHandler { self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer); } - fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { - self.protocol.tick(&mut NetSyncIo::new(io)); + fn timeout(&self, io: &NetworkContext, timer: TimerToken) { + match timer { + TICK_TOKEN => self.protocol.tick(&mut NetSyncIo::new(io)), + PROPAGATE_TOKEN => self.protocol.propagate_transactions(&mut NetSyncIo::new(io)), + _ => {} + } } } From 4951533d57fef8c432a26c6cd04ea9a5342d53e3 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 14 May 2018 16:35:44 +0200 Subject: [PATCH 6/6] add a little tracing --- substrate/network/src/protocol.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index af6bcb10dbee3..b2b87e80ff553 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -445,8 +445,10 @@ impl Protocol { } } - /// Called when peer sends us new transactions + /// Called when we propagate ready transactions to peers. pub fn propagate_transactions(&self, io: &mut SyncIo) { + debug!(target: "sync", "Propagating transactions"); + // Accept transactions only when fully synced if self.sync.read().status().state != SyncState::Idle { return;