Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Hook RPC extrinsic import into propagation #158

Merged
merged 6 commits into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions polkadot/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ tokio-core = "0.1.12"
futures = "0.1.17"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
fdlimit = "0.1"
parking_lot = "0.4"
substrate-client = { path = "../../substrate/client" }
substrate-network = { path = "../../substrate/network" }
substrate-codec = { path = "../../substrate/codec" }
substrate-runtime-support = { path = "../../substrate/runtime-support" }
substrate-state-machine = { path = "../../substrate/state-machine" }
substrate-executor = { path = "../../substrate/executor" }
substrate-primitives = { path = "../../substrate/primitives" }
substrate-rpc = { path = "../../substrate/rpc" }
substrate-rpc-servers = { path = "../../substrate/rpc-servers" }
polkadot-primitives = { path = "../primitives" }
polkadot-executor = { path = "../executor" }
polkadot-runtime = { path = "../runtime" }
polkadot-service = { path = "../service" }
polkadot-transaction-pool = { path = "../transaction-pool" }
39 changes: 38 additions & 1 deletion polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ extern crate ctrlc;
extern crate fdlimit;
extern crate ed25519;
extern crate triehash;
extern crate parking_lot;

extern crate substrate_codec as codec;
extern crate substrate_state_machine as state_machine;
extern crate substrate_client as client;
extern crate substrate_primitives as primitives;
extern crate substrate_network as network;
extern crate substrate_rpc;
extern crate substrate_rpc_servers as rpc;
extern crate substrate_runtime_support as runtime_support;
extern crate polkadot_primitives;
extern crate polkadot_executor;
extern crate polkadot_runtime;
extern crate polkadot_service as service;
extern crate polkadot_transaction_pool as txpool;

#[macro_use]
extern crate lazy_static;
Expand All @@ -57,10 +61,39 @@ mod informant;
use std::io;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
use tokio_core::reactor;
use parking_lot::Mutex;
use service::ChainSpec;
use primitives::block::Extrinsic;

struct RpcTransactionPool {
inner: Arc<Mutex<txpool::TransactionPool>>,
network: Arc<network::Service>,
}

impl substrate_rpc::author::AuthorApi for RpcTransactionPool {
fn submit_extrinsic(&self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> {
use primitives::hexdisplay::HexDisplay;
use polkadot_runtime::UncheckedExtrinsic;
use codec::Slicable;

info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0));
let decoded = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s))
.ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?;

info!("Correctly formatted: {:?}", decoded);

self.inner.lock().import(decoded)
.map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError)?;

self.network.trigger_repropagate();
Ok(())
}
}

/// Parse command line arguments and start the node.
///
Expand Down Expand Up @@ -172,7 +205,11 @@ pub fn run<I, T>(args: I) -> error::Result<()> where

let handler = || {
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
rpc::rpc_handler(service.client(), chain, service.transaction_pool())
let pool = RpcTransactionPool {
inner: service.transaction_pool(),
network: service.network(),
};
rpc::rpc_handler(service.client(), chain, pool)
};
(
start_server(http_address, |address| rpc::start_http(address, handler())),
Expand Down
16 changes: 0 additions & 16 deletions polkadot/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
extern crate ed25519;
extern crate ethereum_types;
extern crate substrate_codec as codec;
extern crate substrate_rpc;
extern crate substrate_primitives as substrate_primitives;
extern crate substrate_runtime_primitives as substrate_runtime_primitives;
extern crate polkadot_runtime as runtime;
Expand All @@ -35,10 +34,8 @@ use std::collections::HashMap;
use std::cmp::Ordering;
use std::sync::Arc;

use codec::Slicable;
use polkadot_api::PolkadotApi;
use primitives::{AccountId, Timestamp};
use substrate_primitives::block::Extrinsic;
use runtime::{Block, UncheckedExtrinsic, TimestampCall, Call};
use substrate_runtime_primitives::traits::Checkable;
use transaction_pool::{Pool, Readiness};
Expand Down Expand Up @@ -380,19 +377,6 @@ impl TransactionPool {
}
}

impl substrate_rpc::author::AsyncAuthorApi for TransactionPool {
fn submit_extrinsic(&mut self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> {
use substrate_primitives::hexdisplay::HexDisplay;
info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0));
let xt = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s))
.ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?;
info!("Correctly formatted: {:?}", xt);
self.import(xt)
.map(|_| ())
.map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError.into())
}
}

#[cfg(test)]
mod tests {
}
11 changes: 8 additions & 3 deletions substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ impl Protocol {
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
return;
}
trace!(target: "sync", "Received {} transactions from {}", peer_id, transactions.len());
trace!(target: "sync", "Received {} transactions from {}", transactions.len(), peer_id);
let mut peers = self.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
for t in transactions {
Expand All @@ -445,12 +445,17 @@ impl Protocol {
}
}

/// Called when peer sends us new transactions
pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
/// Called when we propagate ready transactions to peers.
pub fn propagate_transactions(&self, io: &mut SyncIo) {
debug!(target: "sync", "Propagating transactions");

// Accept transactions only when fully synced
if self.sync.read().status().state != SyncState::Idle {
return;
}

let transactions = self.transaction_pool.transactions();

let mut peers = self.peers.write();
for (peer_id, ref mut peer) in peers.iter_mut() {
let to_send: Vec<_> = transactions.iter().filter_map(|&(hash, ref t)|
Expand Down
25 changes: 20 additions & 5 deletions substrate/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::sync::Arc;
use std::collections::{BTreeMap};
use std::io;
use std::time::Duration;
use futures::sync::{oneshot, mpsc};
use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
Expand All @@ -41,6 +42,12 @@ pub type StatementStream = mpsc::UnboundedReceiver<Statement>;
/// Type that represents bft messages stream.
pub type BftMessageStream = mpsc::UnboundedReceiver<LocalizedBftMessage>;

const TICK_TOKEN: TimerToken = 0;
const TICK_TIMEOUT: Duration = Duration::from_millis(1000);

const PROPAGATE_TOKEN: TimerToken = 1;
const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000);

bitflags! {
/// Node roles bitmask.
pub struct Role: u32 {
Expand Down Expand Up @@ -162,9 +169,9 @@ impl Service {
}

/// Called when new transactons are imported by the client.
pub fn on_new_transactions(&self, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
pub fn trigger_repropagate(&self) {
self.network.with_context(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions);
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context));
});
}

Expand Down Expand Up @@ -268,7 +275,11 @@ impl ConsensusService for Service {

impl NetworkProtocolHandler for ProtocolHandler {
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
io.register_timer(0, ::std::time::Duration::from_millis(1000)).expect("Error registering sync timer");
io.register_timer(TICK_TOKEN, TICK_TIMEOUT)
.expect("Error registering sync timer");

io.register_timer(PROPAGATE_TOKEN, PROPAGATE_TIMEOUT)
.expect("Error registering transaction propagation timer");
}

fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) {
Expand All @@ -283,8 +294,12 @@ impl NetworkProtocolHandler for ProtocolHandler {
self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer);
}

fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
self.protocol.tick(&mut NetSyncIo::new(io));
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
match timer {
TICK_TOKEN => self.protocol.tick(&mut NetSyncIo::new(io)),
PROPAGATE_TOKEN => self.protocol.propagate_transactions(&mut NetSyncIo::new(io)),
_ => {}
}
}
}

Expand Down
14 changes: 0 additions & 14 deletions substrate/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

//! Substrate block-author/full-node API.

use std::sync::Arc;
use parking_lot::Mutex;
use primitives::block::Extrinsic;

pub mod error;
Expand All @@ -35,15 +33,3 @@ build_rpc_trait! {
fn submit_extrinsic(&self, Extrinsic) -> Result<()>;
}
}

/// Variant of the AuthorApi that doesn't need to be Sync + Send + 'static.
pub trait AsyncAuthorApi: Send + 'static {
/// Submit extrinsic for inclusion in block.
fn submit_extrinsic(&mut self, Extrinsic) -> Result<()>;
}

impl<T: AsyncAuthorApi> AuthorApi for Arc<Mutex<T>> {
fn submit_extrinsic(&self, xt: Extrinsic) -> Result<()> {
self.as_ref().lock().submit_extrinsic(xt)
}
}
14 changes: 9 additions & 5 deletions substrate/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,24 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use primitives::block;
use super::*;
use super::error::*;

use std::sync::Arc;
use parking_lot::Mutex;
use primitives::block;

#[derive(Default)]
struct DummyTxPool {
submitted: Vec<block::Extrinsic>,
}

impl AsyncAuthorApi for DummyTxPool {
impl AuthorApi for Arc<Mutex<DummyTxPool>> {
/// Submit extrinsic for inclusion in block.
fn submit_extrinsic(&mut self, xt: Extrinsic) -> Result<()> {
if self.submitted.len() < 1 {
self.submitted.push(xt);
fn submit_extrinsic(&self, xt: Extrinsic) -> Result<()> {
let mut s = self.lock();
if s.submitted.len() < 1 {
s.submitted.push(xt);
Ok(())
} else {
Err(ErrorKind::PoolError.into())
Expand Down