Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
change propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
NikVolf committed May 7, 2020
1 parent 35e3930 commit c6a7ed7
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 57 deletions.
36 changes: 23 additions & 13 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multia
#[doc(hidden)]
pub use crate::protocol::ProtocolConfig;

use crate::{ExHashT, ReportHandle};
use crate::ExHashT;

use core::{fmt, iter};
use futures::future;
use libp2p::identity::{ed25519, Keypair};
use libp2p::wasm_ext;
use libp2p::{multiaddr, Multiaddr, PeerId};
use prometheus_endpoint::Registry;
use sc_peerset::ReputationChange;
use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::{borrow::Cow, convert::TryFrom, future::Future, pin::Pin, str::FromStr};
Expand Down Expand Up @@ -167,6 +167,22 @@ impl<B: BlockT> FinalityProofRequestBuilder<B> for DummyFinalityProofRequestBuil
/// Shared finality proof request builder struct used by the queue.
pub type BoxFinalityProofRequestBuilder<B> = Box<dyn FinalityProofRequestBuilder<B> + Send + Sync>;

/// Result of the transaction import.
#[derive(Clone, Copy, Debug)]
pub enum TransactionImport {
/// Transaction is good but already known by the transaction pool.
KnownGood,
/// Transaction is good and not yet known.
NewGood,
/// Transaction is invalid.
Bad,
/// Transaction import was not performed.
None,
}

/// Fuure resolving to transaction import result.
pub type TransactionImportFuture = Pin<Box<dyn Future<Output=TransactionImport> + Send>>;

/// Transaction pool interface
pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
/// Get transactions from the pool that are ready to be propagated.
Expand All @@ -175,15 +191,11 @@ pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
fn hash_of(&self, transaction: &B::Extrinsic) -> H;
/// Import a transaction into the pool.
///
/// Peer reputation is changed by reputation_change if transaction is accepted by the pool.
/// This will return future.
fn import(
&self,
report_handle: ReportHandle,
who: PeerId,
reputation_change_good: ReputationChange,
reputation_change_bad: ReputationChange,
transaction: B::Extrinsic,
);
) -> TransactionImportFuture;
/// Notify the pool about transactions broadcast.
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>);
/// Get transaction by hash.
Expand All @@ -209,12 +221,10 @@ impl<H: ExHashT + Default, B: BlockT> TransactionPool<H, B> for EmptyTransaction

fn import(
&self,
_report_handle: ReportHandle,
_who: PeerId,
_rep_change_good: ReputationChange,
_rep_change_bad: ReputationChange,
_transaction: B::Extrinsic
) {}
) -> TransactionImportFuture {
Box::pin(future::ready(TransactionImport::KnownGood))
}

fn on_broadcasted(&self, _: HashMap<H, Vec<String>>) {}

Expand Down
48 changes: 38 additions & 10 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use crate::{
ExHashT,
chain::{Client, FinalityProofProvider},
config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool},
config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool, TransactionImportFuture, TransactionImport},
error,
utils::interval
};
Expand Down Expand Up @@ -101,6 +101,13 @@ mod rep {
pub const UNEXPECTED_STATUS: Rep = Rep::new(-(1 << 20), "Unexpected status message");
/// Reputation change when we are a light client and a peer is behind us.
pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer");
/// Reputation change when a peer sends us any extrinsic.
///
/// This forces node to verify it, thus the negative value here. Once extrinsic is verified,
/// reputation change should be refunded with `ANY_EXTRINSIC_REFUND`
pub const ANY_EXTRINSIC: Rep = Rep::new(-(1 << 4), "Any extrinsic");
/// Reputation change when a peer sends us any extrinsic that is not invalid.
pub const ANY_EXTRINSIC_REFUND: Rep = Rep::new(1 << 4, "Any extrinsic (refund)");
/// Reputation change when a peer sends us an extrinsic that we didn't know about.
pub const GOOD_EXTRINSIC: Rep = Rep::new(1 << 7, "Good extrinsic");
/// Reputation change when a peer sends us a bad extrinsic.
Expand Down Expand Up @@ -190,6 +197,8 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
propagate_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome<B>>,
/// Pending extrinsic verification tasks.
pending_transactions: VecDeque<(PeerId, TransactionImportFuture)>,
config: ProtocolConfig,
genesis_hash: B::Hash,
sync: ChainSync<B>,
Expand Down Expand Up @@ -394,6 +403,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
pending_messages: VecDeque::new(),
pending_transactions: VecDeque::new(),
config,
context_data: ContextData {
peers: HashMap::new(),
Expand Down Expand Up @@ -1121,17 +1131,25 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
let hash = self.transaction_pool.hash_of(&t);
peer.known_extrinsics.insert(hash);

self.transaction_pool.import(
self.peerset_handle.clone().into(),
self.peerset_handle.report_peer(who.clone(), rep::ANY_EXTRINSIC);

self.pending_transactions.push_back((
who.clone(),
rep::GOOD_EXTRINSIC,
rep::BAD_EXTRINSIC,
t,
);
self.transaction_pool.import(t),
));
}
}
}

fn on_handle_extrinsic_import(&mut self, who: PeerId, import: TransactionImport) {
match import {
TransactionImport::KnownGood => self.peerset_handle.report_peer(who, rep::ANY_EXTRINSIC_REFUND),
TransactionImport::NewGood => self.peerset_handle.report_peer(who, rep::GOOD_EXTRINSIC),
TransactionImport::Bad => self.peerset_handle.report_peer(who, rep::BAD_EXTRINSIC),
TransactionImport::None => {},
}
}

/// Propagate one extrinsic.
pub fn propagate_extrinsic(
&mut self,
Expand Down Expand Up @@ -1953,7 +1971,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::BlockRequest(r)
GenericMessage::BlockRequest(r),
)
}
}
Expand All @@ -1970,7 +1988,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::BlockRequest(r)
GenericMessage::BlockRequest(r),
)
}
}
Expand All @@ -1988,9 +2006,19 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::FinalityProofRequest(r))
GenericMessage::FinalityProofRequest(r),
)
}
}
if let Some((peer_id, mut pending_transaction)) = self.pending_transactions.pop_front() {
match pending_transaction.poll_unpin(cx) {
Poll::Ready(result) => self.on_handle_extrinsic_import(peer_id, result),
Poll::Pending => {
self.pending_transactions.push_front((peer_id, pending_transaction));
return Poll::Pending;
}
}
}
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
}
Expand Down
1 change: 0 additions & 1 deletion client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,6 @@ ServiceBuilder<
imports_external_transactions: !matches!(config.role, Role::Light),
pool: transaction_pool.clone(),
client: client.clone(),
executor: task_manager.spawn_handle(),
});

let protocol_id = {
Expand Down
68 changes: 35 additions & 33 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use futures::{
sink::SinkExt,
task::{Spawn, FutureObj, SpawnError},
};
use sc_network::{NetworkService, network_state::NetworkState, PeerId, ReportHandle};
use sc_network::{NetworkService, network_state::NetworkState, PeerId};
use log::{log, warn, debug, error, Level};
use codec::{Encode, Decode};
use sp_runtime::generic::BlockId;
Expand All @@ -76,7 +76,10 @@ pub use sc_executor::NativeExecutionDispatch;
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
#[doc(hidden)]
pub use sc_network::config::{FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder};
pub use sc_network::config::{
FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder, TransactionImport,
TransactionImportFuture,
};
pub use sc_tracing::TracingReceiver;
pub use task_manager::SpawnTaskHandle;
use task_manager::TaskManager;
Expand Down Expand Up @@ -616,7 +619,6 @@ pub struct TransactionPoolAdapter<C, P> {
imports_external_transactions: bool,
pool: Arc<P>,
client: Arc<C>,
executor: SpawnTaskHandle,
}

/// Get transactions for propagation.
Expand Down Expand Up @@ -659,42 +661,42 @@ where

fn import(
&self,
report_handle: ReportHandle,
who: PeerId,
reputation_change_good: sc_network::ReputationChange,
reputation_change_bad: sc_network::ReputationChange,
transaction: B::Extrinsic
) {
transaction: B::Extrinsic,
) -> TransactionImportFuture {
if !self.imports_external_transactions {
debug!("Transaction rejected");
return;
Box::pin(futures::future::ready(TransactionImport::None));
}

let encoded = transaction.encode();
match Decode::decode(&mut &encoded[..]) {
Ok(uxt) => {
let best_block_id = BlockId::hash(self.client.info().best_hash);
let source = sp_transaction_pool::TransactionSource::External;
let import_future = self.pool.submit_one(&best_block_id, source, uxt);
let import_future = import_future
.map(move |import_result| {
match import_result {
Ok(_) => report_handle.report_peer(who, reputation_change_good),
Err(e) => match e.into_pool_error() {
Ok(sp_transaction_pool::error::Error::AlreadyImported(_)) => (),
Ok(e) => {
report_handle.report_peer(who, reputation_change_bad);
debug!("Error adding transaction to the pool: {:?}", e)
}
Err(e) => debug!("Error converting pool error: {:?}", e),
}
}
});

self.executor.spawn("extrinsic-import", import_future);
let uxt = match Decode::decode(&mut &encoded[..]) {
Ok(uxt) => uxt,
Err(e) => {
debug!("Transaction invalid: {:?}", e);
return Box::pin(futures::future::ready(TransactionImport::Bad));
}
Err(e) => debug!("Error decoding transaction {}", e),
}
};

let best_block_id = BlockId::hash(self.client.info().best_hash);

let import_future = self.pool.submit_one(&best_block_id, sp_transaction_pool::TransactionSource::External, uxt);
Box::pin(async move {
match import_future.await {
Ok(_) => TransactionImport::NewGood,
Err(e) => match e.into_pool_error() {
Ok(sp_transaction_pool::error::Error::AlreadyImported(_)) => TransactionImport::KnownGood,
Ok(e) => {
debug!("Error adding transaction to the pool: {:?}", e);
TransactionImport::Bad
}
Err(e) => {
debug!("Error converting pool error: {:?}", e);
// it is not bad at least, just some internal node logic error, so peer is innocent.
TransactionImport::KnownGood
}
}
}
})
}

fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>) {
Expand Down

0 comments on commit c6a7ed7

Please sign in to comment.