diff --git a/Cargo.lock b/Cargo.lock index 77c18e918d1d7..41c7da9793339 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2257,8 +2257,10 @@ dependencies = [ "polkadot-availability-store 0.1.0", "polkadot-consensus 0.1.0", "polkadot-primitives 0.1.0", - "rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "slice-group-by 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0 (git+https://github.com/paritytech/substrate)", + "substrate-client 0.1.0 (git+https://github.com/paritytech/substrate)", + "substrate-keyring 0.1.0 (git+https://github.com/paritytech/substrate)", "substrate-network 0.1.0 (git+https://github.com/paritytech/substrate)", "substrate-primitives 0.1.0 (git+https://github.com/paritytech/substrate)", "tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2681,16 +2683,6 @@ dependencies = [ "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rhododendron" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "ring" version = "0.14.5" @@ -2819,6 +2811,11 @@ name = "scopeguard" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "sdset" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "secp256k1" version = "0.12.2" @@ -2952,6 +2949,14 @@ name = "slab" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "slice-group-by" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "sdset 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "slog" version = "2.4.1" @@ -4963,7 +4968,6 @@ dependencies = [ "checksum regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "37e7cbbd370869ce2e8dff25c7018702d10b21a20ef7135316f8daecd6c25b7f" "checksum regex-syntax 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8c2f35eedad5295fdf00a63d7d4b238135723f92b434ec06774dad15c7ab0861" "checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5" -"checksum rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e20523445e693f394c0e487113ae656071311c9ee4c1e914441bece8c929b21d" "checksum ring 0.14.5 (registry+https://github.com/rust-lang/crates.io-index)" = "148fc853f6d85f53f5f315d46701eaacc565cdfb3cb1959730c96e81e7e49999" "checksum rocksdb 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "39be726e556e6f21d54d21cdf1be9f6df30c0411a5856c1abf3f4bb12498f2ed" "checksum rocksdb 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f1651697fefd273bfb4fd69466cc2a9d20de557a0213b97233b22b5e95924b5e" @@ -4980,6 +4984,7 @@ dependencies = [ "checksum schnorrkel 0.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fe554f318830b48e5da8ab1ccb1ffd02b79228364dac7766b7cd1ec461ca5116" "checksum scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "332ffa32bf586782a3efaeb58f127980944bbc8c4d6913a86107ac2a5ab24b28" "checksum scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27" +"checksum sdset 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c6959a7341a17cbff280a619c3a3c0001d2d6b54661e6d04c3741c3af07cc2c5" "checksum secp256k1 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bfaccd3a23619349e0878d9a241f34b1982343cdf67367058cd7d078d326b63e" "checksum security-framework 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bfab8dda0e7a327c696d893df9ffa19cadc4bd195797997f5223cf5831beaf05" "checksum security-framework-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3d6696852716b589dff9e886ff83778bb635150168e83afa8ac6b8a78cb82abc" @@ -4995,6 +5000,7 @@ dependencies = [ "checksum sha3 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "34a5e54083ce2b934bf059fdf38e7330a154177e029ab6c4e18638f2f624053a" "checksum shell32-sys 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9ee04b46101f57121c9da2b151988283b6beb79b34f5bb29a58ee48cb695122c" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" +"checksum slice-group-by 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "35538e9d2853b9f041156791bf1b871f27d45f0a2fc816fd90ebea6c63bb3f93" "checksum slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1e1a2eec401952cd7b12a84ea120e2d57281329940c3f93c2bf04f462539508e" "checksum slog-async 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e544d16c6b230d84c866662fe55e31aacfca6ae71e6fc49ae9a311cb379bfc2f" "checksum slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ddc0d2aff1f8f325ef660d9a0eb6e6dcd20b30b3f581a5897f58bf42d061c37a" diff --git a/consensus/src/attestation_service.rs b/consensus/src/attestation_service.rs index d0cb980c981f5..9ed584642ab77 100644 --- a/consensus/src/attestation_service.rs +++ b/consensus/src/attestation_service.rs @@ -27,22 +27,41 @@ use std::thread; use std::time::{Duration, Instant}; use std::sync::Arc; -use client::{BlockchainEvents, ChainHead, BlockBody}; +use client::{error::Result as ClientResult, BlockchainEvents, ChainHead, BlockBody}; use client::block_builder::api::BlockBuilder; use client::blockchain::HeaderBackend; use client::runtime_api::Core; use primitives::ed25519; use futures::prelude::*; use polkadot_primitives::{Block, BlockId}; -use polkadot_primitives::parachain::ParachainHost; +use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost}; use extrinsic_store::Store as ExtrinsicStore; -use runtime_primitives::traits::ProvideRuntimeApi; +use runtime_primitives::traits::{ProvideRuntimeApi, Header as HeaderT}; use tokio::runtime::TaskExecutor; use tokio::runtime::current_thread::Runtime as LocalRuntime; use tokio::timer::Interval; -use super::{Network, Collators}; +use super::{Network, Collators, TableRouter}; + +/// Gets a list of the candidates in a block. +pub(crate) fn fetch_candidates>(client: &P, block: &BlockId) + -> ClientResult>> +{ + use codec::{Encode, Decode}; + use polkadot_runtime::{Call, ParachainsCall, UncheckedExtrinsic as RuntimeExtrinsic}; + + let extrinsics = client.block_body(block)?; + Ok(extrinsics + .into_iter() + .filter_map(|ex| RuntimeExtrinsic::decode(&mut ex.encode().as_slice())) + .filter_map(|ex| match ex.function { + Call::Parachains(ParachainsCall::set_heads(heads)) => + Some(heads.into_iter().map(|c| c.candidate)), + _ => None, + }) + .next()) +} // creates a task to prune redundant entries in availability store upon block finalization // @@ -52,47 +71,20 @@ fn prune_unneeded_availability

(client: Arc

, extrinsic_store: ExtrinsicStor -> impl Future + Send where P: Send + Sync + BlockchainEvents + BlockBody + 'static { - use codec::{Encode, Decode}; - use polkadot_primitives::BlockId; - - enum NotifyError { - BodyFetch(::client::error::Error), - } - - impl NotifyError { - fn log(&self, hash: &::polkadot_primitives::Hash) { - match *self { - NotifyError::BodyFetch(ref err) => warn!("Failed to fetch block body for imported block {:?}: {:?}", hash, err), - } - } - } - client.finality_notification_stream() .for_each(move |notification| { - use polkadot_runtime::{Call, ParachainsCall, UncheckedExtrinsic as RuntimeExtrinsic}; - let hash = notification.hash; let parent_hash = notification.header.parent_hash; - let extrinsics = client.block_body(&BlockId::hash(hash)) - .map_err(NotifyError::BodyFetch); - - let extrinsics = match extrinsics { - Ok(r) => r, - Err(e) => { e.log(&hash); return Ok(()) } - }; - - let candidate_hashes = match extrinsics - .iter() - .filter_map(|ex| RuntimeExtrinsic::decode(&mut ex.encode().as_slice())) - .filter_map(|ex| match ex.function { - Call::Parachains(ParachainsCall::set_heads(ref heads)) => - Some(heads.iter().map(|c| c.candidate.hash()).collect()), - _ => None, - }) - .next() - { - Some(x) => x, - None => return Ok(()), + let candidate_hashes = match fetch_candidates(&*client, &BlockId::hash(hash)) { + Ok(Some(candidates)) => candidates.map(|c| c.hash()).collect(), + Ok(None) => { + warn!("Could not extract candidates from block body of imported block {:?}", hash); + return Ok(()) + } + Err(e) => { + warn!("Failed to fetch block body for imported block {:?}: {:?}", hash, e); + return Ok(()) + } }; if let Err(e) = extrinsic_store.candidates_finalized(parent_hash, candidate_hashes) { @@ -125,6 +117,7 @@ pub(crate) fn start( P::Api: ParachainHost + Core + BlockBuilder, N: Network + Send + Sync + 'static, N::TableRouter: Send + 'static, + <::FetchIncoming as IntoFuture>::Future: Send + 'static, { const TIMER_DELAY: Duration = Duration::from_secs(5); const TIMER_INTERVAL: Duration = Duration::from_secs(30); @@ -148,6 +141,7 @@ pub(crate) fn start( .and_then(|authorities| { consensus.get_or_instantiate( parent_hash, + notification.header.parent_hash().clone(), &authorities, key.clone(), ) diff --git a/consensus/src/collation.rs b/consensus/src/collation.rs index 369de98f6817a..af1918e3890f5 100644 --- a/consensus/src/collation.rs +++ b/consensus/src/collation.rs @@ -26,6 +26,7 @@ use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, Outgoin use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost}; use runtime_primitives::traits::ProvideRuntimeApi; use parachain::{wasm_executor::{self, ExternalitiesError}, MessageRef}; +use super::Incoming; use futures::prelude::*; @@ -34,7 +35,7 @@ use futures::prelude::*; /// This is expected to be a lightweight, shared type like an `Arc`. pub trait Collators: Clone { /// Errors when producing collations. - type Error; + type Error: std::fmt::Debug; /// A full collation. type Collation: IntoFuture; @@ -54,25 +55,33 @@ pub trait Collators: Clone { /// A future which resolves when a collation is available. /// /// This future is fused. -pub struct CollationFetch { +pub struct CollationFetch { parachain: ParaId, relay_parent_hash: Hash, relay_parent: BlockId, collators: C, + incoming: Incoming, live_fetch: Option<::Future>, client: Arc

, } -impl CollationFetch { +impl CollationFetch { /// Create a new collation fetcher for the given chain. - pub fn new(parachain: ParaId, relay_parent: BlockId, relay_parent_hash: Hash, collators: C, client: Arc

) -> Self { + pub fn new( + parachain: ParaId, + relay_parent_hash: Hash, + collators: C, + client: Arc

, + incoming: Incoming, + ) -> Self { CollationFetch { + relay_parent: BlockId::hash(relay_parent_hash), relay_parent_hash, - relay_parent, collators, client, parachain, live_fetch: None, + incoming, } } @@ -80,6 +89,11 @@ impl CollationFetch { pub fn relay_parent(&self) -> Hash { self.relay_parent_hash } + + /// Access the local parachain ID. + pub fn parachain(&self) -> ParaId { + self.parachain + } } impl Future for CollationFetch @@ -100,7 +114,7 @@ impl Future for CollationFetch try_ready!(poll) }; - match validate_collation(&*self.client, &self.relay_parent, &x) { + match validate_collation(&*self.client, &self.relay_parent, &x, &self.incoming) { Ok(e) => { return Ok(Async::Ready((x, e))) } @@ -148,14 +162,39 @@ error_chain! { } } -/// Compute the egress trie root for a set of messages. -pub fn egress_trie_root>(messages: I) -> Hash +/// Compute a trie root for a set of messages. +pub fn message_queue_root>(messages: I) -> Hash where A: AsRef<[u8]> { ::trie::ordered_trie_root::(messages) } -fn check_and_compute_extrinsic( +/// Compute the set of egress roots for all given outgoing messages. +pub fn egress_roots(mut outgoing: Vec) -> Vec<(ParaId, Hash)> { + // stable sort messages by parachain ID. + outgoing.sort_by_key(|msg| ParaId::from(msg.target)); + + let mut egress_roots = Vec::new(); + { + let mut messages_iter = outgoing.iter().peekable(); + while let Some(batch_target) = messages_iter.peek().map(|o| o.target) { + // we borrow the iterator mutably to ensure it advances so the + // next iteration of the loop starts with `messages_iter` pointing to + // the next batch. + let messages_to = messages_iter + .clone() + .take_while(|o| o.target == batch_target) + .map(|o| { let _ = messages_iter.next(); &o.data[..] }); + + let computed_root = message_queue_root(messages_to); + egress_roots.push((batch_target, computed_root)); + } + } + + egress_roots +} + +fn check_extrinsic( mut outgoing: Vec, expected_egress_roots: &[(ParaId, Hash)], ) -> Result { @@ -183,7 +222,7 @@ fn check_and_compute_extrinsic( .take_while(|o| o.target == batch_target) .map(|o| { let _ = messages_iter.next(); &o.data[..] }); - let computed_root = egress_trie_root(messages_to); + let computed_root = message_queue_root(messages_to); if &computed_root != expected_root { return Err(ErrorKind::EgressRootMismatch( batch_target, @@ -231,7 +270,7 @@ impl Externalities { self, candidate: &CandidateReceipt, ) -> Result { - check_and_compute_extrinsic( + check_extrinsic( self.outgoing, &candidate.egress_queue_roots[..], ) @@ -242,15 +281,17 @@ impl Externalities { /// /// This assumes that basic validity checks have been done: /// - Block data hash is the same as linked in candidate receipt. +/// - incoming messages have been validated against canonical ingress roots pub fn validate_collation

( client: &P, relay_parent: &BlockId, - collation: &Collation + collation: &Collation, + incoming: &Incoming, ) -> Result where P: ProvideRuntimeApi, P::Api: ParachainHost, { - use parachain::ValidationParams; + use parachain::{IncomingMessage, ValidationParams}; let api = client.runtime_api(); let para_id = collation.receipt.parachain_index; @@ -263,6 +304,15 @@ pub fn validate_collation

( let params = ValidationParams { parent_head: chain_head, block_data: collation.block_data.0.clone(), + ingress: incoming.iter() + .flat_map(|&(para_id, ref messages)| { + let source: u32 = para_id.into(); + messages.iter().map(move |msg| IncomingMessage { + source, + data: msg.0.clone(), + }) + }) + .collect() }; let mut ext = Externalities { @@ -291,7 +341,7 @@ mod tests { use parachain::wasm_executor::Externalities as ExternalitiesTrait; #[test] - fn egress_roots() { + fn compute_and_check_egress() { let messages = vec![ OutgoingMessage { target: 3.into(), data: vec![1, 1, 1] }, OutgoingMessage { target: 1.into(), data: vec![1, 2, 3] }, @@ -299,29 +349,36 @@ mod tests { OutgoingMessage { target: 1.into(), data: vec![7, 8, 9] }, ]; - let root_1 = egress_trie_root(&[vec![1, 2, 3], vec![7, 8, 9]]); - let root_2 = egress_trie_root(&[vec![4, 5, 6]]); - let root_3 = egress_trie_root(&[vec![1, 1, 1]]); + let root_1 = message_queue_root(&[vec![1, 2, 3], vec![7, 8, 9]]); + let root_2 = message_queue_root(&[vec![4, 5, 6]]); + let root_3 = message_queue_root(&[vec![1, 1, 1]]); - assert!(check_and_compute_extrinsic( + assert!(check_extrinsic( messages.clone(), &[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3)], ).is_ok()); + let egress_roots = egress_roots(messages.clone()); + + assert!(check_extrinsic( + messages.clone(), + &egress_roots[..], + ).is_ok()); + // missing root. - assert!(check_and_compute_extrinsic( + assert!(check_extrinsic( messages.clone(), &[(1.into(), root_1), (3.into(), root_3)], ).is_err()); // extra root. - assert!(check_and_compute_extrinsic( + assert!(check_extrinsic( messages.clone(), &[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3), (4.into(), Default::default())], ).is_err()); // root mismatch. - assert!(check_and_compute_extrinsic( + assert!(check_extrinsic( messages.clone(), &[(1.into(), root_2), (2.into(), root_1), (3.into(), root_3)], ).is_err()); diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index b4219e2e83ca4..0722675b42eb7 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -77,13 +77,11 @@ use parking_lot::Mutex; use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header, SessionKey}; use polkadot_primitives::parachain::{ Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt, - CandidateSignature -}; -use polkadot_primitives::parachain::{ - AttestedCandidate, ParachainHost, Statement as PrimitiveStatement + CandidateSignature, ParachainHost, AttestedCandidate, Statement as PrimitiveStatement, Message, + OutgoingMessage, }; use primitives::{Ed25519AuthorityId as AuthorityId, ed25519}; -use runtime_primitives::{traits::ProvideRuntimeApi, ApplyError}; +use runtime_primitives::{traits::{ProvideRuntimeApi, Header as HeaderT}, ApplyError}; use tokio::runtime::TaskExecutor; use tokio::timer::{Delay, Interval}; use transaction_pool::txpool::{Pool, ChainApi as PoolChainApi}; @@ -97,11 +95,11 @@ use inherents::InherentData; use runtime_aura::timestamp::TimestampInherentData; use aura::SlotDuration; -pub use self::collation::{validate_collation, egress_trie_root, Collators}; +pub use self::collation::{validate_collation, message_queue_root, egress_roots, Collators}; pub use self::error::{ErrorKind, Error}; pub use self::shared_table::{ SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement, - GenericStatement + GenericStatement, }; mod attestation_service; @@ -115,14 +113,40 @@ pub mod collation; // block size limit. const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024; +/// Incoming messages; a series of sorted (ParaId, Message) pairs. +pub type Incoming = Vec<(ParaId, Vec)>; + +/// Outgoing messages from various candidates. +pub type Outgoing = Vec; + +/// Some messages from a parachain. +pub struct MessagesFrom { + /// The parachain originating the messages. + pub from: ParaId, + /// The messages themselves. + pub messages: ParachainExtrinsic, +} + +impl MessagesFrom { + /// Construct from the raw messages. + pub fn from_messages(from: ParaId, messages: Vec) -> Self { + MessagesFrom { + from, + messages: ParachainExtrinsic { outgoing_messages: messages }, + } + } +} + /// A handle to a statement table router. /// /// This is expected to be a lightweight, shared type like an `Arc`. pub trait TableRouter: Clone { /// Errors when fetching data from the network. - type Error; + type Error: std::fmt::Debug; /// Future that resolves when candidate data is fetched. type FetchCandidate: IntoFuture; + /// Fetch incoming messages for a candidate. + type FetchIncoming: IntoFuture; /// Call with local candidate data. This will make the data available on the network, /// and sign, import, and broadcast a statement about the candidate. @@ -130,6 +154,14 @@ pub trait TableRouter: Clone { /// Fetch block data for a specific candidate. fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate; + + /// Fetches the incoming message data to a parachain from the network. Incoming data should be + /// checked. + /// + /// The `ParachainHost::ingress` function can be used to fetch incoming roots, + /// and the `message_queue_root` function can be used to check that messages actually have + /// expected root. + fn fetch_incoming(&self, id: ParaId) -> Self::FetchIncoming; } /// A long-lived network which can create parachain statement and BFT message routing processes on demand. @@ -138,12 +170,12 @@ pub trait Network { /// routing statements to peers, and driving completion of any `StatementProducers`. type TableRouter: TableRouter; - /// Instantiate a table router using the given shared table and task executor. + /// Instantiate a table router using the given shared table. + /// Also pass through any outgoing messages to be broadcast to peers. fn communication_for( &self, - validators: &[SessionKey], table: Arc, - task_executor: TaskExecutor + outgoing: Outgoing, ) -> Self::TableRouter; } @@ -179,7 +211,12 @@ pub fn check_statement(statement: &Statement, signature: &CandidateSignature, si signature.verify(&encoded[..], &signer.into()) } -fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId], local_id: AuthorityId) -> Result<(HashMap, LocalDuty), Error> { +/// Compute group info out of a duty roster and a local authority set. +pub fn make_group_info( + roster: DutyRoster, + authorities: &[AuthorityId], + local_id: AuthorityId, +) -> Result<(HashMap, LocalDuty), Error> { if roster.validator_duty.len() != authorities.len() { bail!(ErrorKind::InvalidDutyRosterLength(authorities.len(), roster.validator_duty.len())) } @@ -232,25 +269,31 @@ struct ParachainConsensus { handle: TaskExecutor, /// Store for extrinsic data. extrinsic_store: ExtrinsicStore, - /// Live agreements. + /// Live agreements. Maps relay chain parent hashes to attestation + /// instances. live_instances: Mutex>>, } impl ParachainConsensus where C: Collators + Send + 'static, N: Network, - P: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, + P: ProvideRuntimeApi + HeaderBackend + BlockBody + Send + Sync + 'static, P::Api: ParachainHost + BlockBuilderApi, ::Future: Send + 'static, N::TableRouter: Send + 'static, + <::FetchIncoming as IntoFuture>::Future: Send + 'static, { /// Get an attestation table for given parent hash. /// - /// This starts a parachain agreement process for given parent hash if + /// This starts a parachain agreement process on top of the parent hash if /// one has not already started. + /// + /// Additionally, this will trigger broadcast of data to the new block's duty + /// roster. fn get_or_instantiate( &self, parent_hash: Hash, + grandparent_hash: Hash, authorities: &[AuthorityId], sign_with: Arc, ) @@ -262,6 +305,29 @@ impl ParachainConsensus where } let id = BlockId::hash(parent_hash); + + // compute the parent candidates, if we know of them. + // this will allow us to circulate outgoing messages to other peers as necessary. + let parent_candidates: Vec<_> = ::attestation_service::fetch_candidates(&*self.client, &id) + .ok() + .and_then(|x| x) + .map(|x| x.collect()) + .unwrap_or_default(); + + let outgoing: Vec<_> = { + // extract all extrinsic data that we have and propagate to peers. + live_instances.get(&grandparent_hash).map(|parent_consensus| { + parent_candidates.iter().filter_map(|c| { + let para_id = c.parachain_index; + let hash = c.hash(); + parent_consensus.table.extrinsic_data(&hash).map(|ex| MessagesFrom { + from: para_id, + messages: ex, + }) + }).collect() + }).unwrap_or_default() + }; + let duty_roster = self.client.runtime_api().duty_roster(&id)?; let (group_info, local_duty) = make_group_info( @@ -279,31 +345,19 @@ impl ParachainConsensus where let table = Arc::new(SharedTable::new(group_info, sign_with.clone(), parent_hash, self.extrinsic_store.clone())); let router = self.network.communication_for( - authorities, table.clone(), - self.handle.clone() + outgoing, ); - let validation_para = match local_duty.validation { + let drop_signal = match local_duty.validation { + Chain::Parachain(id) => Some(self.launch_work( + parent_hash, + id, + router, + )), Chain::Relay => None, - Chain::Parachain(id) => Some(id), }; - let collation_work = validation_para.map(|para| CollationFetch::new( - para, - id.clone(), - parent_hash.clone(), - self.collators.clone(), - self.client.clone(), - )); - - let drop_signal = dispatch_collation_work( - router.clone(), - &self.handle, - collation_work, - self.extrinsic_store.clone(), - ); - let tracker = Arc::new(AttestationTracker { table, started: Instant::now(), @@ -319,11 +373,80 @@ impl ParachainConsensus where fn retain bool>(&self, mut pred: F) { self.live_instances.lock().retain(|k, _| pred(k)) } + + // launch parachain work asynchronously. + fn launch_work( + &self, + relay_parent: Hash, + validation_para: ParaId, + router: N::TableRouter, + ) -> exit_future::Signal { + use extrinsic_store::Data; + + let (signal, exit) = exit_future::signal(); + + let fetch_incoming = router.fetch_incoming(validation_para) + .into_future() + .map_err(|e| format!("{:?}", e)); + + // fetch incoming messages to our parachain from network and + // then fetch a local collation. + let (collators, client) = (self.collators.clone(), self.client.clone()); + let collation_work = fetch_incoming + .map_err(|e| String::clone(&e)) + .and_then(move |incoming| { + CollationFetch::new( + validation_para, + relay_parent, + collators, + client, + incoming, + ).map_err(|e| format!("{:?}", e)) + }); + + let extrinsic_store = self.extrinsic_store.clone(); + let handled_work = collation_work.then(move |result| match result { + Ok((collation, extrinsic)) => { + let res = extrinsic_store.make_available(Data { + relay_parent, + parachain_id: collation.receipt.parachain_index, + candidate_hash: collation.receipt.hash(), + block_data: collation.block_data.clone(), + extrinsic: Some(extrinsic.clone()), + }); + + match res { + Ok(()) => { + // TODO: https://github.com/paritytech/polkadot/issues/51 + // Erasure-code and provide merkle branches. + router.local_candidate(collation.receipt, collation.block_data, extrinsic) + } + Err(e) => warn!( + target: "consensus", + "Failed to make collation data available: {:?}", + e, + ), + } + + Ok(()) + } + Err(e) => { + warn!(target: "consensus", "Failed to collate candidate: {}", e); + Ok(()) + } + }); + + let cancellable_work = handled_work.select(exit).then(|_| Ok(())); + + // spawn onto thread pool. + self.handle.spawn(cancellable_work); + signal + } } /// Parachain consensus for a single block. struct AttestationTracker { - _drop_signal: exit_future::Signal, + _drop_signal: Option, table: Arc, started: Instant, } @@ -345,6 +468,7 @@ impl ProposerFactory where P::Api: ParachainHost + Core + BlockBuilderApi, N: Network + Send + Sync + 'static, N::TableRouter: Send + 'static, + <::FetchIncoming as IntoFuture>::Future: Send + 'static, TxApi: PoolChainApi, { /// Create a new proposer factory. @@ -389,10 +513,11 @@ impl consensus::Environment for ProposerFactory, - P: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, + P: ProvideRuntimeApi + HeaderBackend + BlockBody + Send + Sync + 'static, P::Api: ParachainHost + BlockBuilderApi, ::Future: Send + 'static, N::TableRouter: Send + 'static, + <::FetchIncoming as IntoFuture>::Future: Send + 'static, { type Proposer = Proposer; type Error = Error; @@ -407,6 +532,7 @@ impl consensus::Environment for ProposerFactory consensus::Environment for ProposerFactory( - router: R, - handle: &TaskExecutor, - work: Option>, - extrinsic_store: ExtrinsicStore, -) -> exit_future::Signal where - C: Collators + Send + 'static, - P: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, - P::Api: ParachainHost, - ::Future: Send + 'static, - R: TableRouter + Send + 'static, -{ - use extrinsic_store::Data; - - let (signal, exit) = exit_future::signal(); - - let work = match work { - Some(w) => w, - None => return signal, - }; - - let relay_parent = work.relay_parent(); - let handled_work = work.then(move |result| match result { - Ok((collation, extrinsic)) => { - let res = extrinsic_store.make_available(Data { - relay_parent, - parachain_id: collation.receipt.parachain_index, - candidate_hash: collation.receipt.hash(), - block_data: collation.block_data.clone(), - extrinsic: Some(extrinsic.clone()), - }); - - match res { - Ok(()) => { - // TODO: https://github.com/paritytech/polkadot/issues/51 - // Erasure-code and provide merkle branches. - router.local_candidate(collation.receipt, collation.block_data, extrinsic) - } - Err(e) => - warn!(target: "consensus", "Failed to make collation data available: {:?}", e), - } - - Ok(()) - } - Err(_e) => { - warn!(target: "consensus", "Failed to collate candidate"); - Ok(()) - } - }); - - let cancellable_work = handled_work.select(exit).then(|_| Ok(())); - - // spawn onto thread pool. - handle.spawn(cancellable_work); - signal -} - -struct LocalDuty { +/// The local duty of a validator. +pub struct LocalDuty { validation: Chain, } diff --git a/consensus/src/shared_table/mod.rs b/consensus/src/shared_table/mod.rs index 8a7967ea76466..829ad76642748 100644 --- a/consensus/src/shared_table/mod.rs +++ b/consensus/src/shared_table/mod.rs @@ -17,7 +17,7 @@ //! Parachain statement table meant to be shared with a message router //! and a consensus proposer. -use std::collections::{HashMap, HashSet}; +use std::collections::hash_map::{HashMap, Entry}; use std::sync::Arc; use extrinsic_store::{Data, Store as ExtrinsicStore}; @@ -29,9 +29,9 @@ use polkadot_primitives::parachain::{ }; use parking_lot::Mutex; -use futures::prelude::*; +use futures::{future, prelude::*}; -use super::{GroupInfo, TableRouter}; +use super::{GroupInfo, Incoming, TableRouter}; use self::includable::IncludabilitySender; use primitives::ed25519; use runtime_primitives::{traits::ProvideRuntimeApi}; @@ -74,13 +74,40 @@ impl TableContext { } } +pub(crate) enum Validation { + Valid(BlockData, Extrinsic), + Invalid(BlockData), // should take proof. +} + +enum ValidationWork { + Done(Validation), + InProgress, + Error(String), +} + +#[cfg(test)] +impl ValidationWork { + fn is_in_progress(&self) -> bool { + match *self { + ValidationWork::InProgress => true, + _ => false, + } + } + + fn is_done(&self) -> bool { + match *self { + ValidationWork::Done(_) => true, + _ => false, + } + } +} + // A shared table object. struct SharedTableInner { table: Table, - proposed_digest: Option, - checked_validity: HashSet, trackers: Vec, extrinsic_store: ExtrinsicStore, + validated: HashMap, } impl SharedTableInner { @@ -94,9 +121,10 @@ impl SharedTableInner { context: &TableContext, router: &R, statement: table::SignedStatement, - ) -> Option Option::Future, - >> { + ::Future, + >>> { let summary = match self.table.import_statement(context, statement) { Some(summary) => summary, None => return None, @@ -111,20 +139,34 @@ impl SharedTableInner { let digest = &summary.candidate; // TODO: consider a strategy based on the number of candidate votes as well. - // only check validity if this wasn't locally proposed. - let extra_work = para_member - && self.proposed_digest.as_ref().map_or(true, |d| d != digest) - && self.checked_validity.insert(digest.clone()); + let do_validation = para_member && match self.validated.entry(digest.clone()) { + Entry::Occupied(_) => false, + Entry::Vacant(entry) => { + entry.insert(ValidationWork::InProgress); + true + } + }; - let work = if extra_work { + let work = if do_validation { + let fetch_incoming = router.fetch_incoming(summary.group_id); match self.table.get_candidate(&digest) { - None => None, // TODO: handle table inconsistency somehow? + None => { + let message = format!( + "Table inconsistency detected. Summary returned for candidate {} \ + but receipt not present in table.", + digest, + ); + + warn!(target: "consensus", "{}", message); + self.validated.insert(digest.clone(), ValidationWork::Error(message)); + None + } Some(candidate) => { let fetch_block_data = router.fetch_block_data(candidate).into_future(); Some(Work { candidate_receipt: candidate.clone(), - fetch_block_data, + fetch: fetch_block_data.join(fetch_incoming), }) } } @@ -152,37 +194,83 @@ impl SharedTableInner { /// Produced after validating a candidate. pub struct Validated { /// A statement about the validity of the candidate. - pub validity: table::Statement, - /// Block data to ensure availability of. - pub block_data: BlockData, - /// Extrinsic data to ensure availability of. - pub extrinsic: Option, + statement: table::Statement, + /// The result of validation. + result: Validation, +} + +impl Validated { + /// Note that we've validated a candidate with given hash and it is bad. + pub fn known_bad(hash: Hash, block_data: BlockData) -> Self { + Validated { + statement: GenericStatement::Invalid(hash), + result: Validation::Invalid(block_data), + } + } + + /// Note that we've validated a candidate with given hash and it is good. + /// Extrinsic data required. + pub fn known_good(hash: Hash, block_data: BlockData, extrinsic: Extrinsic) -> Self { + Validated { + statement: GenericStatement::Valid(hash), + result: Validation::Valid(block_data, extrinsic), + } + } + + /// Note that we've collated a candidate. + /// Extrinsic data required. + pub fn collated_local( + receipt: CandidateReceipt, + block_data: BlockData, + extrinsic: Extrinsic, + ) -> Self { + Validated { + statement: GenericStatement::Candidate(receipt), + result: Validation::Valid(block_data, extrinsic), + } + } + + /// Get a reference to the block data. + pub fn block_data(&self) -> &BlockData { + match self.result { + Validation::Valid(ref b, _) | Validation::Invalid(ref b) => b, + } + } + + /// Get a reference to the extrinsic data, if any. + pub fn extrinsic(&self) -> Option<&Extrinsic> { + match self.result { + Validation::Valid(_, ref ex) => Some(ex), + Validation::Invalid(_) => None, + } + } } /// Future that performs parachain validation work. -pub struct ParachainWork { - work: Work, +pub struct ParachainWork { + work: Work, relay_parent: Hash, extrinsic_store: ExtrinsicStore, } -impl ParachainWork { +impl ParachainWork { /// Prime the parachain work with an API reference for extracting /// chain information. pub fn prime(self, api: Arc

) -> PrimedParachainWork< - D, - impl Send + FnMut(&BlockId, &Collation) -> Result, + Fetch, + impl Send + FnMut(&BlockId, &Collation, &Incoming) -> Result, > where P: Send + Sync + 'static, P::Api: ParachainHost, { - let validate = move |id: &_, collation: &_| { + let validate = move |id: &_, collation: &_, incoming: &_| { let res = ::collation::validate_collation( &*api, id, collation, + incoming, ); match res { @@ -198,28 +286,28 @@ impl ParachainWork { } /// Prime the parachain work with a custom validation function. - pub fn prime_with(self, validate: F) -> PrimedParachainWork - where F: FnMut(&BlockId, &Collation) -> Result + pub fn prime_with(self, validate: F) -> PrimedParachainWork + where F: FnMut(&BlockId, &Collation, &Incoming) -> Result { PrimedParachainWork { inner: self, validate } } } -struct Work { +struct Work { candidate_receipt: CandidateReceipt, - fetch_block_data: D, + fetch: Fetch } /// Primed statement producer. -pub struct PrimedParachainWork { - inner: ParachainWork, +pub struct PrimedParachainWork { + inner: ParachainWork, validate: F, } -impl Future for PrimedParachainWork +impl Future for PrimedParachainWork where - D: Future, - F: FnMut(&BlockId, &Collation) -> Result, + Fetch: Future, + F: FnMut(&BlockId, &Collation, &Incoming) -> Result, Err: From<::std::io::Error>, { type Item = Validated; @@ -229,10 +317,11 @@ impl Future for PrimedParachainWork let work = &mut self.inner.work; let candidate = &work.candidate_receipt; - let block = try_ready!(work.fetch_block_data.poll()); + let (block, incoming) = try_ready!(work.fetch.poll()); let validation_res = (self.validate)( &BlockId::hash(self.inner.relay_parent), &Collation { block_data: block.clone(), receipt: candidate.clone() }, + &incoming, ); let candidate_hash = candidate.hash(); @@ -240,8 +329,11 @@ impl Future for PrimedParachainWork debug!(target: "consensus", "Making validity statement about candidate {}: is_good? {:?}", candidate_hash, validation_res.is_ok()); - let (extrinsic, validity_statement) = match validation_res { - Err(()) => (None, GenericStatement::Invalid(candidate_hash)), + let (validity_statement, result) = match validation_res { + Err(()) => ( + GenericStatement::Invalid(candidate_hash), + Validation::Invalid(block), + ), Ok(extrinsic) => { self.inner.extrinsic_store.make_available(Data { relay_parent: self.inner.relay_parent, @@ -251,14 +343,16 @@ impl Future for PrimedParachainWork extrinsic: Some(extrinsic.clone()), })?; - (Some(extrinsic), GenericStatement::Valid(candidate_hash)) + ( + GenericStatement::Valid(candidate_hash), + Validation::Valid(block, extrinsic) + ) } }; Ok(Async::Ready(Validated { - validity: validity_statement, - block_data: block, - extrinsic, + statement: validity_statement, + result, })) } } @@ -293,8 +387,7 @@ impl SharedTable { context: Arc::new(TableContext { groups, key, parent_hash }), inner: Arc::new(Mutex::new(SharedTableInner { table: Table::default(), - proposed_digest: None, - checked_validity: HashSet::new(), + validated: HashMap::new(), trackers: Vec::new(), extrinsic_store, })) @@ -316,6 +409,19 @@ impl SharedTable { &self.context.groups } + /// Get extrinsic data for candidate with given hash, if any. + /// + /// This will return `Some` for any candidates that have been validated + /// locally. + pub(crate) fn extrinsic_data(&self, hash: &Hash) -> Option { + self.inner.lock().validated.get(hash).and_then(|x| match *x { + ValidationWork::Error(_) => None, + ValidationWork::InProgress => None, + ValidationWork::Done(Validation::Invalid(_)) => None, + ValidationWork::Done(Validation::Valid(_, ref ex)) => Some(ex.clone()), + }) + } + /// Import a single statement with remote source, whose signature has already been checked. /// /// The statement producer, if any, will produce only statements concerning the same candidate @@ -324,9 +430,10 @@ impl SharedTable { &self, router: &R, statement: table::SignedStatement, - ) -> Option Option::Future, - >> { + ::Future, + >>> { self.inner.lock().import_remote_statement(&*self.context, router, statement) } @@ -340,9 +447,10 @@ impl SharedTable { where R: TableRouter, I: IntoIterator, - U: ::std::iter::FromIterator::Future, - >>>, + ::Future, + >>>>, { let mut inner = self.inner.lock(); @@ -351,23 +459,20 @@ impl SharedTable { }).collect() } - /// Sign and import a local statement. - pub fn sign_and_import(&self, statement: table::Statement) + /// Sign and import the result of candidate validation. + pub fn import_validated(&self, validated: Validated) -> SignedStatement { - let proposed_digest = match statement { - GenericStatement::Candidate(ref c) => Some(c.hash()), - _ => None, + let digest = match validated.statement { + GenericStatement::Candidate(ref c) => c.hash(), + GenericStatement::Valid(h) | GenericStatement::Invalid(h) => h, }; - let signed_statement = self.context.sign_statement(statement); + let signed_statement = self.context.sign_statement(validated.statement); let mut inner = self.inner.lock(); - if proposed_digest.is_some() { - inner.proposed_digest = proposed_digest; - } - inner.table.import_statement(&*self.context, signed_statement.clone()); + inner.validated.insert(digest, ValidationWork::Done(validated.result)); signed_statement } @@ -447,13 +552,19 @@ mod tests { impl TableRouter for DummyRouter { type Error = ::std::io::Error; type FetchCandidate = ::futures::future::FutureResult; + type FetchIncoming = ::futures::future::FutureResult; fn local_candidate(&self, _candidate: CandidateReceipt, _block_data: BlockData, _extrinsic: Extrinsic) { } + fn fetch_block_data(&self, _candidate: &CandidateReceipt) -> Self::FetchCandidate { future::ok(BlockData(vec![1, 2, 3, 4, 5])) } + + fn fetch_incoming(&self, _para_id: ParaId) -> Self::FetchIncoming { + future::ok(Vec::new()) + } } #[test] @@ -579,18 +690,18 @@ mod tests { let producer: ParachainWork> = ParachainWork { work: Work { candidate_receipt: candidate, - fetch_block_data: future::ok(block_data.clone()), + fetch: future::ok((block_data.clone(), Vec::new())), }, relay_parent, extrinsic_store: store.clone(), }; - let produced = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) + let validated = producer.prime_with(|_, _, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) .wait() .unwrap(); - assert_eq!(produced.block_data, block_data); - assert_eq!(produced.validity, GenericStatement::Valid(hash)); + assert_eq!(validated.block_data(), &block_data); + assert_eq!(validated.statement, GenericStatement::Valid(hash)); assert_eq!(store.block_data(relay_parent, hash).unwrap(), block_data); assert!(store.extrinsic(relay_parent, hash).is_some()); @@ -619,19 +730,132 @@ mod tests { let producer = ParachainWork { work: Work { candidate_receipt: candidate, - fetch_block_data: future::ok::<_, ::std::io::Error>(block_data.clone()), + fetch: future::ok::<_, ::std::io::Error>((block_data.clone(), Vec::new())), }, relay_parent, extrinsic_store: store.clone(), }; - let produced = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) + let validated = producer.prime_with(|_, _, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) .wait() .unwrap(); - assert_eq!(produced.block_data, block_data); + assert_eq!(validated.block_data(), &block_data); assert_eq!(store.block_data(relay_parent, hash).unwrap(), block_data); assert!(store.extrinsic(relay_parent, hash).is_some()); } + + #[test] + fn does_not_dispatch_work_after_starting_validation() { + let mut groups = HashMap::new(); + + let para_id = ParaId::from(1); + let local_id = Keyring::Alice.to_raw_public().into(); + let local_key = Arc::new(Keyring::Alice.pair()); + + let validity_other = Keyring::Bob.to_raw_public().into(); + let validity_other_key = Keyring::Bob.pair(); + let parent_hash = Default::default(); + + groups.insert(para_id, GroupInfo { + validity_guarantors: [local_id, validity_other].iter().cloned().collect(), + needed_validity: 1, + }); + + let shared_table = SharedTable::new( + groups, + local_key.clone(), + parent_hash, + ExtrinsicStore::new_in_memory(), + ); + + let candidate = CandidateReceipt { + parachain_index: para_id, + collator: [1; 32].into(), + signature: Default::default(), + head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]), + balance_uploads: Vec::new(), + egress_queue_roots: Vec::new(), + fees: 1_000_000, + block_data_hash: [2; 32].into(), + }; + + let hash = candidate.hash(); + let candidate_statement = GenericStatement::Candidate(candidate); + + let signature = ::sign_table_statement(&candidate_statement, &validity_other_key, &parent_hash); + let signed_statement = ::table::generic::SignedStatement { + statement: candidate_statement, + signature: signature.into(), + sender: validity_other, + }; + + let _a = shared_table.import_remote_statement( + &DummyRouter, + signed_statement.clone(), + ).expect("should produce work"); + + assert!(shared_table.inner.lock().validated.get(&hash).expect("validation has started").is_in_progress()); + + let b = shared_table.import_remote_statement( + &DummyRouter, + signed_statement.clone(), + ); + + assert!(b.is_none(), "cannot work when validation has started"); + } + + #[test] + fn does_not_dispatch_after_local_candidate() { + let mut groups = HashMap::new(); + + let para_id = ParaId::from(1); + let local_id = Keyring::Alice.to_raw_public().into(); + let local_key = Arc::new(Keyring::Alice.pair()); + let block_data = BlockData(vec![1, 2, 3]); + let extrinsic = Extrinsic { outgoing_messages: Vec::new() }; + + let validity_other = Keyring::Bob.to_raw_public().into(); + let parent_hash = Default::default(); + + groups.insert(para_id, GroupInfo { + validity_guarantors: [local_id, validity_other].iter().cloned().collect(), + needed_validity: 1, + }); + + let shared_table = SharedTable::new( + groups, + local_key.clone(), + parent_hash, + ExtrinsicStore::new_in_memory(), + ); + + let candidate = CandidateReceipt { + parachain_index: para_id, + collator: [1; 32].into(), + signature: Default::default(), + head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]), + balance_uploads: Vec::new(), + egress_queue_roots: Vec::new(), + fees: 1_000_000, + block_data_hash: [2; 32].into(), + }; + + let hash = candidate.hash(); + let signed_statement = shared_table.import_validated(Validated::collated_local( + candidate, + block_data, + extrinsic, + )); + + assert!(shared_table.inner.lock().validated.get(&hash).expect("validation has started").is_done()); + + let a = shared_table.import_remote_statement( + &DummyRouter, + signed_statement, + ); + + assert!(a.is_none()); + } } diff --git a/network/Cargo.toml b/network/Cargo.toml index 17fad6c193630..3ab6274dc1e18 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -18,4 +18,8 @@ sr-primitives = { git = "https://github.com/paritytech/substrate" } futures = "0.1" tokio = "0.1.7" log = "0.4" -rhododendron = "0.3" +slice-group-by = "0.2.2" + +[dev-dependencies] +substrate-client = { git = "https://github.com/paritytech/substrate" } +substrate-keyring = { git = "https://github.com/paritytech/substrate" } diff --git a/network/src/consensus.rs b/network/src/consensus.rs index da3a27ddcf91d..3f62995aff30f 100644 --- a/network/src/consensus.rs +++ b/network/src/consensus.rs @@ -20,13 +20,14 @@ //! each time consensus begins on a new chain head. use sr_primitives::traits::ProvideRuntimeApi; -use substrate_network::consensus_gossip::ConsensusMessage; -use polkadot_consensus::{Network, SharedTable, Collators, Statement, GenericStatement}; +use substrate_network::{consensus_gossip::ConsensusMessage, Context as NetContext}; +use polkadot_consensus::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement}; use polkadot_primitives::{AccountId, Block, Hash, SessionKey}; use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData}; use codec::Decode; use futures::prelude::*; +use futures::future::Executor as FutureExecutor; use futures::sync::mpsc; use std::collections::HashMap; @@ -36,22 +37,95 @@ use arrayvec::ArrayVec; use tokio::runtime::TaskExecutor; use parking_lot::Mutex; -use super::NetworkService; use router::Router; +use super::PolkadotProtocol; + +/// An executor suitable for dispatching async consensus tasks. +pub trait Executor { + fn spawn + Send + 'static>(&self, f: F); +} + +/// A wrapped futures::future::Executor. +pub struct WrappedExecutor(pub T); + +impl Executor for WrappedExecutor + where T: FutureExecutor + Send + 'static>> +{ + fn spawn + Send + 'static>(&self, f: F) { + if let Err(e) = self.0.execute(Box::new(f)) { + warn!(target: "consensus", "could not spawn consensus task: {:?}", e); + } + } +} + +impl Executor for TaskExecutor { + fn spawn + Send + 'static>(&self, f: F) { + TaskExecutor::spawn(self, f) + } +} + +/// Basic functionality that a network has to fulfill. +pub trait NetworkService: Send + Sync + 'static { + /// Get a stream of gossip messages for a given hash. + fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver; + + /// Gossip a message on given topic. + fn gossip_message(&self, topic: Hash, message: Vec); + + /// Drop a gossip topic. + fn drop_gossip(&self, topic: Hash); + + /// Execute a closure with the polkadot protocol. + fn with_spec(&self, with: F) + where F: FnOnce(&mut PolkadotProtocol, &mut NetContext); +} + +impl NetworkService for super::NetworkService { + fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver { + let (tx, rx) = std::sync::mpsc::channel(); + + self.with_gossip(move |gossip, _| { + let inner_rx = gossip.messages_for(topic); + let _ = tx.send(inner_rx); + }); + + match rx.recv() { + Ok(rx) => rx, + Err(_) => mpsc::unbounded().1, // return empty channel. + } + } + + fn gossip_message(&self, topic: Hash, message: Vec) { + self.gossip_consensus_message(topic, message, false); + } + + fn drop_gossip(&self, topic: Hash) { + self.with_gossip(move |gossip, _| { + gossip.collect_garbage_for_topic(topic); + }) + } + + fn with_spec(&self, with: F) + where F: FnOnce(&mut PolkadotProtocol, &mut NetContext) + { + super::NetworkService::with_spec(self, with) + } +} // task that processes all gossipped consensus messages, // checking signatures -struct MessageProcessTask { +struct MessageProcessTask { inner_stream: mpsc::UnboundedReceiver, parent_hash: Hash, - table_router: Router

, - exit: E, + table_router: Router, } -impl MessageProcessTask where +impl MessageProcessTask where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, E: Future + Clone + Send + 'static, + N: NetworkService, + T: Clone + Executor + Send + 'static, { fn process_message(&self, msg: ConsensusMessage) -> Option> { use polkadot_consensus::SignedStatement; @@ -64,7 +138,7 @@ impl MessageProcessTask where statement.sender, &self.parent_hash ) { - self.table_router.import_statement(statement, self.exit.clone()); + self.table_router.import_statement(statement); } } @@ -72,10 +146,12 @@ impl MessageProcessTask where } } -impl Future for MessageProcessTask where +impl Future for MessageProcessTask where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, E: Future + Clone + Send + 'static, + N: NetworkService, + T: Clone + Executor + Send + 'static, { type Item = (); type Error = (); @@ -95,43 +171,45 @@ impl Future for MessageProcessTask where } /// Wrapper around the network service -pub struct ConsensusNetwork { - network: Arc, +pub struct ConsensusNetwork { + network: Arc, api: Arc

, + executor: T, exit: E, } -impl ConsensusNetwork { +impl ConsensusNetwork { /// Create a new consensus networking object. - pub fn new(network: Arc, exit: E, api: Arc

) -> Self { - ConsensusNetwork { network, exit, api } + pub fn new(network: Arc, exit: E, api: Arc

, executor: T) -> Self { + ConsensusNetwork { network, exit, api, executor } } } -impl Clone for ConsensusNetwork { +impl Clone for ConsensusNetwork { fn clone(&self) -> Self { ConsensusNetwork { network: self.network.clone(), exit: self.exit.clone(), api: self.api.clone(), + executor: self.executor.clone(), } } } /// A long-lived network which can create parachain statement routing processes on demand. -impl Network for ConsensusNetwork where +impl ParachainNetwork for ConsensusNetwork where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, E: Clone + Future + Send + 'static, + N: NetworkService, + T: Clone + Executor + Send + 'static, { - type TableRouter = Router

; + type TableRouter = Router; - /// Instantiate a table router using the given shared table. fn communication_for( &self, - _validators: &[SessionKey], table: Arc, - task_executor: TaskExecutor, + outgoing: polkadot_consensus::Outgoing, ) -> Self::TableRouter { let parent_hash = table.consensus_parent_hash().clone(); @@ -142,41 +220,34 @@ impl Network for ConsensusNetwork where table, self.network.clone(), self.api.clone(), - task_executor.clone(), + self.executor.clone(), parent_hash, knowledge.clone(), + self.exit.clone(), ); - let attestation_topic = table_router.gossip_topic(); - let exit = self.exit.clone(); + table_router.broadcast_egress(outgoing); - let (tx, rx) = std::sync::mpsc::channel(); - self.network.with_gossip(move |gossip, _| { - let inner_rx = gossip.messages_for(attestation_topic); - let _ = tx.send(inner_rx); - }); + let attestation_topic = table_router.gossip_topic(); let table_router_clone = table_router.clone(); - let executor = task_executor.clone(); + let executor = self.executor.clone(); // spin up a task in the background that processes all incoming statements // TODO: propagate statements on a timer? + let inner_stream = self.network.gossip_messages_for(attestation_topic); self.network .with_spec(move |spec, ctx| { spec.new_consensus(ctx, parent_hash, CurrentConsensus { knowledge, local_session_key, }); - let inner_stream = match rx.try_recv() { - Ok(inner_stream) => inner_stream, - _ => unreachable!("1. The with_gossip closure executed first, 2. the reply should be available") - }; let process_task = MessageProcessTask { inner_stream, parent_hash, table_router: table_router_clone, - exit, }; + executor.spawn(process_task); }); @@ -213,8 +284,10 @@ impl Future for AwaitingCollation { } } -impl Collators for ConsensusNetwork - where P::Api: ParachainHost, +impl Collators for ConsensusNetwork where + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost, + N: NetworkService, { type Error = NetworkDown; type Collation = AwaitingCollation; diff --git a/network/src/lib.rs b/network/src/lib.rs index 3e3280c5b5088..1d2a9f734f8a9 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -16,9 +16,8 @@ //! Polkadot-specific network implementation. //! -//! This manages gossip of consensus messages for BFT and for parachain statements, -//! parachain block and extrinsic data fetching, communication between collators and validators, -//! and more. +//! This manages routing for parachain statements, parachain block and extrinsic data fetching, +//! communication between collators and validators, and more. extern crate parity_codec as codec; extern crate substrate_network; @@ -30,16 +29,23 @@ extern crate polkadot_availability_store as av_store; extern crate polkadot_primitives; extern crate arrayvec; -extern crate futures; extern crate parking_lot; extern crate tokio; -extern crate rhododendron; +extern crate slice_group_by; +#[macro_use] +extern crate futures; #[macro_use] extern crate log; #[macro_use] extern crate parity_codec_derive; +#[cfg(test)] +extern crate substrate_client; + +#[cfg(test)] +extern crate substrate_keyring; + mod collator_pool; mod local_collations; mod router; @@ -256,7 +262,7 @@ impl PolkadotProtocol { send_polkadot_message( ctx, who, - Message::RequestBlockData(req_id, parent, c_hash) + Message::RequestBlockData(req_id, parent, c_hash), ); in_flight.insert((req_id, who), pending); diff --git a/network/src/router.rs b/network/src/router.rs index 78fb544a0af41..d2e9d901d812e 100644 --- a/network/src/router.rs +++ b/network/src/router.rs @@ -23,21 +23,28 @@ //! and dispatch evaluation work as necessary when new statements come in. use sr_primitives::traits::{ProvideRuntimeApi, BlakeTwo256, Hash as HashT}; -use polkadot_consensus::{SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork}; +use polkadot_consensus::{ + SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Incoming, + Validated, Outgoing, +}; use polkadot_primitives::{Block, Hash, SessionKey}; -use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt, ParachainHost}; +use polkadot_primitives::parachain::{ + BlockData, Extrinsic, CandidateReceipt, ParachainHost, Id as ParaId, Message +}; -use codec::Encode; -use futures::prelude::*; -use tokio::runtime::TaskExecutor; +use codec::{Encode, Decode}; +use futures::{future, prelude::*}; +use futures::sync::oneshot::{self, Receiver}; use parking_lot::Mutex; -use std::collections::{HashMap, HashSet}; -use std::io; +use std::collections::{hash_map::{Entry, HashMap}, HashSet}; +use std::{io, mem}; use std::sync::Arc; -use consensus::Knowledge; -use super::NetworkService; +use consensus::{NetworkService, Knowledge, Executor}; + +type IngressPair = (ParaId, Vec); +type IngressPairRef<'a> = (ParaId, &'a [Message]); fn attestation_topic(parent_hash: Hash) -> Hash { let mut v = parent_hash.as_ref().to_vec(); @@ -46,26 +53,88 @@ fn attestation_topic(parent_hash: Hash) -> Hash { BlakeTwo256::hash(&v[..]) } +fn incoming_message_topic(parent_hash: Hash, parachain: ParaId) -> Hash { + let mut v = parent_hash.as_ref().to_vec(); + parachain.using_encoded(|s| v.extend(s)); + v.extend(b"incoming"); + + BlakeTwo256::hash(&v[..]) +} + +/// Receiver for block data. +pub struct BlockDataReceiver { + outer: Receiver>, + inner: Option> +} + +impl Future for BlockDataReceiver { + type Item = BlockData; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + let map_err = |_| io::Error::new( + io::ErrorKind::Other, + "Sending end of channel hung up", + ); + + if let Some(ref mut inner) = self.inner { + return inner.poll().map_err(map_err); + } + match self.outer.poll().map_err(map_err)? { + Async::Ready(mut inner) => { + let poll_result = inner.poll(); + self.inner = Some(inner); + poll_result.map_err(map_err) + } + Async::NotReady => Ok(Async::NotReady), + } + } +} +/// receiver for incoming data. +#[derive(Clone)] +pub struct IncomingReceiver { + inner: future::Shared> +} + +impl Future for IncomingReceiver { + type Item = Incoming; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + match self.inner.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(i)) => Ok(Async::Ready(Incoming::clone(&*i))), + Err(_) => Err(io::Error::new( + io::ErrorKind::Other, + "Sending end of channel hung up", + )), + } + } +} + /// Table routing implementation. -pub struct Router

{ +pub struct Router { table: Arc, - network: Arc, + network: Arc, api: Arc

, - task_executor: TaskExecutor, + exit: E, + task_executor: T, parent_hash: Hash, attestation_topic: Hash, knowledge: Arc>, + fetch_incoming: Arc>>, deferred_statements: Arc>, } -impl

Router

{ +impl Router { pub(crate) fn new( table: Arc, - network: Arc, + network: Arc, api: Arc

, - task_executor: TaskExecutor, + task_executor: T, parent_hash: Hash, knowledge: Arc>, + exit: E, ) -> Self { Router { table, @@ -75,7 +144,9 @@ impl

Router

{ parent_hash, attestation_topic: attestation_topic(parent_hash), knowledge, + fetch_incoming: Arc::new(Mutex::new(HashMap::new())), deferred_statements: Arc::new(Mutex::new(DeferredStatements::new())), + exit, } } @@ -85,7 +156,7 @@ impl

Router

{ } } -impl

Clone for Router

{ +impl Clone for Router { fn clone(&self) -> Self { Router { table: self.table.clone(), @@ -95,18 +166,21 @@ impl

Clone for Router

{ parent_hash: self.parent_hash.clone(), attestation_topic: self.attestation_topic.clone(), deferred_statements: self.deferred_statements.clone(), + fetch_incoming: self.fetch_incoming.clone(), knowledge: self.knowledge.clone(), + exit: self.exit.clone(), } } } -impl Router

- where P::Api: ParachainHost +impl Router where + P::Api: ParachainHost, + N: NetworkService, + T: Clone + Executor + Send + 'static, + E: Future + Clone + Send + 'static, { /// Import a statement whose signature has been checked already. - pub(crate) fn import_statement(&self, statement: SignedStatement, exit: Exit) - where Exit: Future + Clone + Send + 'static - { + pub(crate) fn import_statement(&self, statement: SignedStatement) { trace!(target: "p_net", "importing consensus statement {:?}", statement.statement); // defer any statements for which we haven't imported the candidate yet @@ -146,15 +220,47 @@ impl Router

if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) { trace!(target: "consensus", "driving statement work to completion"); - self.task_executor.spawn(work.select(exit.clone()).then(|_| Ok(()))); + let work = work.select2(self.exit.clone()).then(|_| Ok(())); + self.task_executor.spawn(work); + } + } + } + + /// Broadcast outgoing messages to peers. + pub(crate) fn broadcast_egress(&self, outgoing: Outgoing) { + use slice_group_by::LinearGroupBy; + + let mut group_messages = Vec::new(); + for egress in outgoing { + let source = egress.from; + let messages = egress.messages.outgoing_messages; + + let groups = LinearGroupBy::new(&messages, |a, b| a.target == b.target); + for group in groups { + let target = match group.get(0) { + Some(msg) => msg.target, + None => continue, // skip empty. + }; + + group_messages.clear(); // reuse allocation from previous iterations. + group_messages.extend(group.iter().map(|msg| msg.data.clone()).map(Message)); + + debug!(target: "consensus", "Circulating messages from {:?} to {:?} at {}", + source, target, self.parent_hash); + + // this is the ingress from source to target, with given messages. + let target_incoming = incoming_message_topic(self.parent_hash, target); + let ingress_for: IngressPairRef = (source, &group_messages[..]); + + self.network.gossip_message(target_incoming, ingress_for.encode()); } } } fn create_work(&self, candidate_hash: Hash, producer: ParachainWork) - -> impl Future + -> impl Future + Send + 'static where - D: Future + Send + 'static, + D: Future + Send + 'static, { let table = self.table.clone(); let network = self.network.clone(); @@ -162,36 +268,97 @@ impl Router

let attestation_topic = self.attestation_topic.clone(); producer.prime(self.api.clone()) - .map(move |produced| { + .map(move |validated| { // store the data before broadcasting statements, so other peers can fetch. knowledge.lock().note_candidate( candidate_hash, - Some(produced.block_data), - produced.extrinsic, + Some(validated.block_data().clone()), + validated.extrinsic().cloned(), ); // propagate the statement. // consider something more targeted than gossip in the future. - let signed = table.sign_and_import(produced.validity); - network.gossip_consensus_message(attestation_topic, signed.encode(), false); + let signed = table.import_validated(validated); + network.gossip_message(attestation_topic, signed.encode()); }) .map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e)) } + +} + +impl Router where + P::Api: ParachainHost, + N: NetworkService, + T: Executor, + E: Future + Clone + Send + 'static, +{ + fn do_fetch_incoming(&self, parachain: ParaId) -> IncomingReceiver { + use polkadot_primitives::BlockId; + let (tx, rx) = { + let mut fetching = self.fetch_incoming.lock(); + match fetching.entry(parachain) { + Entry::Occupied(entry) => return entry.get().clone(), + Entry::Vacant(entry) => { + // has not been requested yet. + let (tx, rx) = oneshot::channel(); + let rx = IncomingReceiver { inner: rx.shared() }; + entry.insert(rx.clone()); + + (tx, rx) + } + } + }; + + let parent_hash = self.parent_hash; + let topic = incoming_message_topic(parent_hash, parachain); + let gossip_messages = self.network.gossip_messages_for(topic) + .map_err(|()| panic!("unbounded receivers do not throw errors; qed")) + .filter_map(|msg| IngressPair::decode(&mut msg.as_slice())); + + let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain) + .map_err(|e| format!("Cannot fetch ingress for parachain {:?} at {:?}: {:?}", + parachain, parent_hash, e) + ); + + let work = canon_roots.into_future() + .and_then(move |ingress_roots| match ingress_roots { + None => Err(format!("No parachain {:?} registered at {}", parachain, parent_hash)), + Some(roots) => Ok(roots.into_iter().collect()) + }) + .and_then(move |ingress_roots| ComputeIngress { + inner: gossip_messages, + ingress_roots, + incoming: Vec::new(), + }) + .map(move |incoming| if let Some(i) = incoming { let _ = tx.send(i); }) + .select2(self.exit.clone()) + .then(|_| Ok(())); + + self.task_executor.spawn(work); + + rx + } } -impl TableRouter for Router

- where P::Api: ParachainHost +impl TableRouter for Router where + P::Api: ParachainHost, + N: NetworkService, + T: Clone + Executor + Send + 'static, + E: Future + Clone + Send + 'static, { type Error = io::Error; type FetchCandidate = BlockDataReceiver; + type FetchIncoming = IncomingReceiver; fn local_candidate(&self, receipt: CandidateReceipt, block_data: BlockData, extrinsic: Extrinsic) { - // give to network to make available. + // produce a signed statement let hash = receipt.hash(); - let candidate = self.table.sign_and_import(GenericStatement::Candidate(receipt)); + let validated = Validated::collated_local(receipt, block_data.clone(), extrinsic.clone()); + let statement = self.table.import_validated(validated); + // give to network to make available. self.knowledge.lock().note_candidate(hash, Some(block_data), Some(extrinsic)); - self.network.gossip_consensus_message(self.attestation_topic, candidate.encode(), false); + self.network.gossip_message(self.attestation_topic, statement.encode()); } fn fetch_block_data(&self, candidate: &CandidateReceipt) -> BlockDataReceiver { @@ -204,44 +371,27 @@ impl TableRouter for Router

}); BlockDataReceiver { outer: rx, inner: None } } + + fn fetch_incoming(&self, parachain: ParaId) -> Self::FetchIncoming { + self.do_fetch_incoming(parachain) + } } -impl

Drop for Router

{ +impl Drop for Router { fn drop(&mut self) { let parent_hash = self.parent_hash.clone(); self.network.with_spec(move |spec, _| spec.remove_consensus(&parent_hash)); - } -} + self.network.drop_gossip(self.attestation_topic); -/// Receiver for block data. -pub struct BlockDataReceiver { - outer: ::futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver>, - inner: Option<::futures::sync::oneshot::Receiver> -} - -impl Future for BlockDataReceiver { - type Item = BlockData; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - if let Some(ref mut inner) = self.inner { - return inner - .poll() - .map_err(|_| io::Error::new( - io::ErrorKind::Other, - "Sending end of channel hung up", - )) - } - if let Ok(futures::Async::Ready(mut inner)) = self.outer.poll() { - let poll_result = inner.poll(); - self.inner = Some(inner); - return poll_result - .map_err(|_| io::Error::new( - io::ErrorKind::Other, - "Sending end of channel hung up", - )) + { + let mut incoming_fetched = self.fetch_incoming.lock(); + for (para_id, _) in incoming_fetched.drain() { + self.network.drop_gossip(incoming_message_topic( + self.parent_hash, + para_id, + )); + } } - Ok(futures::Async::NotReady) } } @@ -300,10 +450,63 @@ impl DeferredStatements { } } +// computes ingress from incoming stream of messages. +// returns `None` if the stream concludes too early. +#[must_use = "futures do nothing unless polled"] +struct ComputeIngress { + ingress_roots: HashMap, + incoming: Vec, + inner: S, +} + +impl Future for ComputeIngress where S: Stream { + type Item = Option; + type Error = S::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + if self.ingress_roots.is_empty() { + return Ok(Async::Ready( + Some(mem::replace(&mut self.incoming, Vec::new())) + )) + } + + let (para_id, messages) = match try_ready!(self.inner.poll()) { + None => return Ok(Async::Ready(None)), + Some(next) => next, + }; + + match self.ingress_roots.entry(para_id) { + Entry::Vacant(_) => continue, + Entry::Occupied(occupied) => { + let canon_root = occupied.get().clone(); + let messages = messages.iter().map(|m| &m.0[..]); + if ::polkadot_consensus::message_queue_root(messages) != canon_root { + continue; + } + + occupied.remove(); + } + } + + let pos = self.incoming.binary_search_by_key( + ¶_id, + |&(id, _)| id, + ) + .err() + .expect("incoming starts empty and only inserted when \ + para_id not inserted before; qed"); + + self.incoming.insert(pos, (para_id, messages)); + } + } +} + #[cfg(test)] mod tests { use super::*; use substrate_primitives::H512; + use futures::stream; #[test] fn deferred_statements_works() { @@ -345,4 +548,70 @@ mod tests { assert!(traces.is_empty()); } } + + #[test] + fn compute_ingress_works() { + let actual_messages = [ + ( + ParaId::from(1), + vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])], + ), + ( + ParaId::from(2), + vec![ + Message(vec![1, 3, 7, 9, 1, 2, 3, 4, 5, 6]), + Message(b"hello world".to_vec()), + ], + ), + ( + ParaId::from(5), + vec![Message(vec![1, 2, 3, 4, 5]), Message(vec![6, 9, 6, 9])], + ), + ]; + + let roots: HashMap<_, _> = actual_messages.iter() + .map(|&(para_id, ref messages)| ( + para_id, + ::polkadot_consensus::message_queue_root(messages.iter().map(|m| &m.0)), + )) + .collect(); + + let inputs = [ + ( + ParaId::from(1), // wrong message. + vec![Message(vec![1, 1, 2, 2]), Message(vec![3, 3, 4, 4])], + ), + ( + ParaId::from(1), + vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])], + ), + ( + ParaId::from(1), // duplicate + vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])], + ), + + ( + ParaId::from(5), // out of order + vec![Message(vec![1, 2, 3, 4, 5]), Message(vec![6, 9, 6, 9])], + ), + ( + ParaId::from(1234), // un-routed parachain. + vec![Message(vec![9, 9, 9, 9])], + ), + ( + ParaId::from(2), + vec![ + Message(vec![1, 3, 7, 9, 1, 2, 3, 4, 5, 6]), + Message(b"hello world".to_vec()), + ], + ), + ]; + let ingress = ComputeIngress { + ingress_roots: roots, + incoming: Vec::new(), + inner: stream::iter_ok::<_, ()>(inputs.iter().cloned()), + }; + + assert_eq!(ingress.wait().unwrap().unwrap(), actual_messages); + } } diff --git a/network/src/tests/consensus.rs b/network/src/tests/consensus.rs new file mode 100644 index 0000000000000..990238d84e1dc --- /dev/null +++ b/network/src/tests/consensus.rs @@ -0,0 +1,466 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Tests and helpers for consensus networking. + +use consensus::NetworkService; +use substrate_network::{consensus_gossip::ConsensusMessage, Context as NetContext}; +use substrate_primitives::{Ed25519AuthorityId, NativeOrEncoded}; +use substrate_keyring::Keyring; +use {PolkadotProtocol}; + +use polkadot_consensus::{SharedTable, MessagesFrom, Network, TableRouter}; +use polkadot_primitives::{AccountId, Block, Hash, Header, BlockId}; +use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, ParachainHost, OutgoingMessage}; +use parking_lot::Mutex; +use substrate_client::error::Result as ClientResult; +use substrate_client::runtime_api::{Core, RuntimeVersion, ApiExt}; +use sr_primitives::ExecutionContext; +use sr_primitives::traits::{ApiRef, ProvideRuntimeApi}; + +use std::collections::HashMap; +use std::sync::Arc; +use futures::{prelude::*, sync::mpsc}; +use tokio::runtime::{Runtime, TaskExecutor}; + +use super::TestContext; + +#[derive(Clone, Copy)] +struct NeverExit; + +impl Future for NeverExit { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(Async::NotReady) + } +} + +struct GossipRouter { + incoming_messages: mpsc::UnboundedReceiver<(Hash, ConsensusMessage)>, + incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender)>, + outgoing: Vec<(Hash, mpsc::UnboundedSender)>, + messages: Vec<(Hash, ConsensusMessage)>, +} + +impl GossipRouter { + fn add_message(&mut self, topic: Hash, message: ConsensusMessage) { + self.outgoing.retain(|&(ref o_topic, ref sender)| { + o_topic != &topic || sender.unbounded_send(message.clone()).is_ok() + }); + self.messages.push((topic, message)); + } + + fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender) { + for message in self.messages.iter() + .filter(|&&(ref t, _)| t == &topic) + .map(|&(_, ref msg)| msg.clone()) + { + if let Err(_) = sender.unbounded_send(message) { return } + } + + self.outgoing.push((topic, sender)); + } +} + +impl Future for GossipRouter { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + loop { + match self.incoming_messages.poll().unwrap() { + Async::Ready(Some((topic, message))) => self.add_message(topic, message), + Async::Ready(None) => panic!("ended early."), + Async::NotReady => break, + } + } + + loop { + match self.incoming_streams.poll().unwrap() { + Async::Ready(Some((topic, sender))) => self.add_outgoing(topic, sender), + Async::Ready(None) => panic!("ended early."), + Async::NotReady => break, + } + } + + Ok(Async::NotReady) + } +} + + +#[derive(Clone)] +struct GossipHandle { + send_message: mpsc::UnboundedSender<(Hash, ConsensusMessage)>, + send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender)>, +} + +fn make_gossip() -> (GossipRouter, GossipHandle) { + let (message_tx, message_rx) = mpsc::unbounded(); + let (listener_tx, listener_rx) = mpsc::unbounded(); + + ( + GossipRouter { + incoming_messages: message_rx, + incoming_streams: listener_rx, + outgoing: Vec::new(), + messages: Vec::new(), + }, + GossipHandle { send_message: message_tx, send_listener: listener_tx }, + ) +} + +struct TestNetwork { + proto: Arc>, + gossip: GossipHandle, +} + +impl NetworkService for TestNetwork { + fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded(); + let _ = self.gossip.send_listener.unbounded_send((topic, tx)); + rx + } + + fn gossip_message(&self, topic: Hash, message: ConsensusMessage) { + let _ = self.gossip.send_message.unbounded_send((topic, message)); + } + + fn drop_gossip(&self, _topic: Hash) {} + + fn with_spec(&self, with: F) + where F: FnOnce(&mut PolkadotProtocol, &mut NetContext) + { + let mut context = TestContext::default(); + let res = with(&mut *self.proto.lock(), &mut context); + // TODO: send context to worker for message routing. + res + } +} + +#[derive(Default)] +struct ApiData { + validators: Vec, + duties: Vec, + active_parachains: Vec, + ingress: HashMap>, +} + +#[derive(Default, Clone)] +struct TestApi { + data: Arc>, +} + +struct RuntimeApi { + data: Arc>, +} + +impl ProvideRuntimeApi for TestApi { + type Api = RuntimeApi; + + fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> { + RuntimeApi { data: self.data.clone() }.into() + } +} + +impl Core for RuntimeApi { + fn version_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult> { + unimplemented!("Not required for testing!") + } + + fn authorities_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult>> { + unimplemented!("Not required for testing!") + } + + fn execute_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<(Block)>, + _: Vec, + ) -> ClientResult> { + unimplemented!("Not required for testing!") + } + + fn initialise_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<&Header>, + _: Vec, + ) -> ClientResult> { + unimplemented!("Not required for testing!") + } +} + +impl ApiExt for RuntimeApi { + fn map_api_result Result, R, E>( + &self, + _: F + ) -> Result { + unimplemented!("Not required for testing!") + } + + fn runtime_version_at(&self, _: &BlockId) -> ClientResult { + unimplemented!("Not required for testing!") + } +} + +impl ParachainHost for RuntimeApi { + fn validators_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult>> { + Ok(NativeOrEncoded::Native(self.data.lock().validators.clone())) + } + + fn duty_roster_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult> { + + Ok(NativeOrEncoded::Native(DutyRoster { + validator_duty: self.data.lock().duties.clone(), + })) + } + + fn active_parachains_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult>> { + Ok(NativeOrEncoded::Native(self.data.lock().active_parachains.clone())) + } + + fn parachain_head_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option, + _: Vec, + ) -> ClientResult>>> { + Ok(NativeOrEncoded::Native(Some(Vec::new()))) + } + + fn parachain_code_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option, + _: Vec, + ) -> ClientResult>>> { + Ok(NativeOrEncoded::Native(Some(Vec::new()))) + } + + fn ingress_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + id: Option, + _: Vec, + ) -> ClientResult>>> { + let id = id.unwrap(); + Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned())) + } +} + +type TestConsensusNetwork = ::consensus::ConsensusNetwork< + TestApi, + NeverExit, + TestNetwork, + TaskExecutor, +>; + +struct Built { + gossip: GossipRouter, + api_handle: Arc>, + networks: Vec, +} + +fn build_network(n: usize, executor: TaskExecutor) -> Built { + let (gossip_router, gossip_handle) = make_gossip(); + let api_handle = Arc::new(Mutex::new(Default::default())); + let runtime_api = Arc::new(TestApi { data: api_handle.clone() }); + + let networks = (0..n).map(|_| { + let net = Arc::new(TestNetwork { + proto: Arc::new(Mutex::new(PolkadotProtocol::new(None))), + gossip: gossip_handle.clone(), + }); + + TestConsensusNetwork::new( + net, + NeverExit, + runtime_api.clone(), + executor.clone(), + ) + }); + + let networks: Vec<_> = networks.collect(); + + Built { + gossip: gossip_router, + api_handle, + networks, + } +} + +#[derive(Default)] +struct IngressBuilder { + egress: HashMap<(ParaId, ParaId), Vec>>, +} + +impl IngressBuilder { + fn add_messages(&mut self, source: ParaId, messages: &[OutgoingMessage]) { + for message in messages { + let target = message.target; + self.egress.entry((source, target)).or_insert_with(Vec::new).push(message.data.clone()); + } + } + + fn build(self) -> HashMap> { + let mut map = HashMap::new(); + for ((source, target), messages) in self.egress { + map.entry(target).or_insert_with(Vec::new) + .push((source, polkadot_consensus::message_queue_root(&messages))); + } + + for roots in map.values_mut() { + roots.sort_by_key(|&(para_id, _)| para_id); + } + + map + } +} + +fn make_table(data: &ApiData, local_key: &Keyring, parent_hash: Hash) -> Arc { + use ::av_store::Store; + + let store = Store::new_in_memory(); + let authorities: Vec<_> = data.validators.iter().map(|v| v.to_fixed_bytes().into()).collect(); + let (group_info, _) = ::polkadot_consensus::make_group_info( + DutyRoster { validator_duty: data.duties.clone() }, + &authorities, + local_key.to_raw_public().into() + ).unwrap(); + + Arc::new(SharedTable::new( + group_info, + Arc::new(local_key.pair()), + parent_hash, + store, + )) +} + +#[test] +fn ingress_fetch_works() { + let mut runtime = Runtime::new().unwrap(); + let built = build_network(3, runtime.executor()); + + let id_a: ParaId = 1.into(); + let id_b: ParaId = 2.into(); + let id_c: ParaId = 3.into(); + + let key_a = Keyring::Alice; + let key_b = Keyring::Bob; + let key_c = Keyring::Charlie; + + let messages_from_a = vec![ + OutgoingMessage { target: id_b, data: vec![1, 2, 3] }, + OutgoingMessage { target: id_b, data: vec![3, 4, 5] }, + OutgoingMessage { target: id_c, data: vec![9, 9, 9] }, + ]; + + let messages_from_b = vec![ + OutgoingMessage { target: id_a, data: vec![1, 1, 1, 1, 1,] }, + OutgoingMessage { target: id_c, data: b"hello world".to_vec() }, + ]; + + let messages_from_c = vec![ + OutgoingMessage { target: id_a, data: b"dog42".to_vec() }, + OutgoingMessage { target: id_b, data: b"dogglesworth".to_vec() }, + ]; + + let ingress = { + let mut builder = IngressBuilder::default(); + builder.add_messages(id_a, &messages_from_a); + builder.add_messages(id_b, &messages_from_b); + builder.add_messages(id_c, &messages_from_c); + + builder.build() + }; + + let parent_hash = [1; 32].into(); + + let (router_a, router_b, router_c) = { + let mut api_handle = built.api_handle.lock(); + *api_handle = ApiData { + active_parachains: vec![id_a, id_b, id_c], + duties: vec![Chain::Parachain(id_a), Chain::Parachain(id_b), Chain::Parachain(id_c)], + validators: vec![ + key_a.to_raw_public().into(), + key_b.to_raw_public().into(), + key_c.to_raw_public().into(), + ], + ingress, + }; + + ( + built.networks[0].communication_for( + make_table(&*api_handle, &key_a, parent_hash), + vec![MessagesFrom::from_messages(id_a, messages_from_a)], + ), + built.networks[1].communication_for( + make_table(&*api_handle, &key_b, parent_hash), + vec![MessagesFrom::from_messages(id_b, messages_from_b)], + ), + built.networks[2].communication_for( + make_table(&*api_handle, &key_c, parent_hash), + vec![MessagesFrom::from_messages(id_c, messages_from_c)], + ), + ) + }; + + // make sure everyone can get ingress for their own parachain. + let fetch_a = router_a.fetch_incoming(id_a).map_err(|_| format!("Could not fetch ingress_a")); + let fetch_b = router_b.fetch_incoming(id_b).map_err(|_| format!("Could not fetch ingress_b")); + let fetch_c = router_c.fetch_incoming(id_c).map_err(|_| format!("Could not fetch ingress_c")); + + let work = fetch_a.join3(fetch_b, fetch_c); + runtime.spawn(built.gossip.then(|_| Ok(()))); // in background. + runtime.block_on(work).unwrap(); +} diff --git a/network/src/tests.rs b/network/src/tests/mod.rs similarity index 99% rename from network/src/tests.rs rename to network/src/tests/mod.rs index 20ed401f555e5..60cd03f7dffdb 100644 --- a/network/src/tests.rs +++ b/network/src/tests/mod.rs @@ -34,6 +34,8 @@ use substrate_network::{ use std::sync::Arc; use futures::Future; +mod consensus; + #[derive(Default)] struct TestContext { disabled: Vec, diff --git a/parachain/src/lib.rs b/parachain/src/lib.rs index 90a8df3ccfaf3..434b560c75707 100644 --- a/parachain/src/lib.rs +++ b/parachain/src/lib.rs @@ -72,7 +72,7 @@ pub mod wasm_executor; pub mod wasm_api; /// Validation parameters for evaluating the parachain validity function. -// TODO: consolidated ingress and balance downloads +// TODO: balance downloads #[derive(PartialEq, Eq, Decode)] #[cfg_attr(feature = "std", derive(Debug, Encode))] pub struct ValidationParams { @@ -80,6 +80,8 @@ pub struct ValidationParams { pub block_data: Vec, /// Previous head-data. pub parent_head: Vec, + /// Incoming messages. + pub ingress: Vec, } /// The result of parachain validation. @@ -91,8 +93,17 @@ pub struct ValidationResult { pub head_data: Vec, } +/// An incoming message. +#[derive(PartialEq, Eq, Decode)] +#[cfg_attr(feature = "std", derive(Debug, Encode))] +pub struct IncomingMessage { + /// The source parachain. + pub source: u32, + /// The data of the message. + pub data: Vec, +} + /// A reference to a message. -#[cfg(feature = "std")] pub struct MessageRef<'a> { /// The target parachain. pub target: u32, diff --git a/parachain/src/wasm_api.rs b/parachain/src/wasm_api.rs index 0fa8a2de1a42a..8cda5c3da6cbd 100644 --- a/parachain/src/wasm_api.rs +++ b/parachain/src/wasm_api.rs @@ -17,7 +17,7 @@ //! Utilities for writing parachain WASM. use codec::{Encode, Decode}; -use super::{ValidationParams, ValidationResult, Message}; +use super::{ValidationParams, ValidationResult, MessageRef}; mod ll { extern "C" { @@ -55,7 +55,7 @@ pub fn write_result(result: ValidationResult) -> usize { } /// Post a message to another parachain. -pub fn post_message(message: &Message) { +pub fn post_message(message: MessageRef) { let data_ptr = message.data.as_ptr(); let data_len = message.data.len(); diff --git a/parachain/tests/adder.rs b/parachain/tests/adder.rs index b0086e4a72343..c03cc98579746 100644 --- a/parachain/tests/adder.rs +++ b/parachain/tests/adder.rs @@ -22,7 +22,7 @@ extern crate parity_codec as codec; extern crate polkadot_parachain as parachain; extern crate tiny_keccak; -use parachain::{MessageRef, ValidationParams}; +use parachain::{MessageRef, IncomingMessage, ValidationParams}; use parachain::wasm_executor::{Externalities, ExternalitiesError}; use codec::{Decode, Encode}; @@ -46,6 +46,12 @@ struct BlockData { add: u64, } +#[derive(Encode, Decode)] +struct AddMessage { + /// amount to add. + amount: u64, +} + struct DummyExt; impl Externalities for DummyExt { fn post_message(&mut self, _message: MessageRef) -> Result<(), ExternalitiesError> { @@ -81,6 +87,7 @@ fn execute_good_on_parent() { ValidationParams { parent_head: parent_head.encode(), block_data: block_data.encode(), + ingress: Vec::new(), }, &mut DummyExt, ).unwrap(); @@ -115,6 +122,7 @@ fn execute_good_chain_on_parent() { ValidationParams { parent_head: parent_head.encode(), block_data: block_data.encode(), + ingress: Vec::new(), }, &mut DummyExt, ).unwrap(); @@ -149,7 +157,45 @@ fn execute_bad_on_parent() { ValidationParams { parent_head: parent_head.encode(), block_data: block_data.encode(), + ingress: Vec::new(), }, &mut DummyExt, ).unwrap_err(); } + +#[test] +fn processes_messages() { + let parent_head = HeadData { + number: 0, + parent_hash: [0; 32], + post_state: hash_state(0), + }; + + let block_data = BlockData { + state: 0, + add: 512, + }; + + let bad_message_data = vec![1]; + assert!(AddMessage::decode(&mut &bad_message_data[..]).is_none()); + + let ret = parachain::wasm_executor::validate_candidate( + TEST_CODE, + ValidationParams { + parent_head: parent_head.encode(), + block_data: block_data.encode(), + ingress: vec![ + IncomingMessage { source: 1, data: (AddMessage { amount: 256 }).encode() }, + IncomingMessage { source: 2, data: bad_message_data }, + IncomingMessage { source: 3, data: (AddMessage { amount: 256 }).encode() }, + ], + }, + &mut DummyExt, + ).unwrap(); + + let new_head = HeadData::decode(&mut &ret.head_data[..]).unwrap(); + + assert_eq!(new_head.number, 1); + assert_eq!(new_head.parent_hash, hash_head(&parent_head)); + assert_eq!(new_head.post_state, hash_state(1024)); +} diff --git a/parachain/tests/res/adder.wasm b/parachain/tests/res/adder.wasm index 3261356729834..387e336777204 100644 Binary files a/parachain/tests/res/adder.wasm and b/parachain/tests/res/adder.wasm differ diff --git a/primitives/src/parachain.rs b/primitives/src/parachain.rs index c304cb1bf0abd..d5a3b77a64d8a 100644 --- a/primitives/src/parachain.rs +++ b/primitives/src/parachain.rs @@ -177,7 +177,7 @@ pub struct Collation { /// Parachain ingress queue message. #[derive(PartialEq, Eq, Clone)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Encode, Decode, Debug))] pub struct Message(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); /// Consolidated ingress queue data. @@ -288,6 +288,9 @@ decl_runtime_apis! { fn parachain_head(id: Id) -> Option>; /// Get the given parachain's head code blob. fn parachain_code(id: Id) -> Option>; + /// Get the ingress roots to a specific parachain at a + /// block. + fn ingress(to: Id) -> Option>; } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 8de8ccd90c53f..7814df01fe599 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -28,11 +28,9 @@ extern crate secp256k1; #[cfg(test)] extern crate tiny_keccak; - #[macro_use] extern crate bitvec; -#[macro_use] extern crate parity_codec_derive; extern crate parity_codec as codec; @@ -364,6 +362,9 @@ impl_runtime_apis! { fn parachain_code(id: parachain::Id) -> Option> { Parachains::parachain_code(&id) } + fn ingress(to: parachain::Id) -> Option> { + Parachains::ingress(to) + } } impl fg_primitives::GrandpaApi for Runtime { diff --git a/runtime/src/parachains.rs b/runtime/src/parachains.rs index 8e1b4c11491cd..504109c3b3819 100644 --- a/runtime/src/parachains.rs +++ b/runtime/src/parachains.rs @@ -21,6 +21,7 @@ use codec::Decode; use bitvec::BigEndian; use sr_primitives::traits::{Hash as HashT, BlakeTwo256}; +use primitives::Hash; use primitives::parachain::{Id as ParaId, Chain, DutyRoster, AttestedCandidate, Statement}; use {system, session}; @@ -45,8 +46,10 @@ decl_storage! { pub Parachains get(active_parachains): Vec; // The parachains registered at present. pub Code get(parachain_code): map ParaId => Option>; - // The heads of the parachains registered at present. these are kept sorted. + // The heads of the parachains registered at present. pub Heads get(parachain_head): map ParaId => Option>; + // message routing roots (from, to). + pub Routing: map (ParaId, ParaId) => Option; // Did the parachain heads get updated in this block? DidUpdate: bool; @@ -68,6 +71,7 @@ decl_storage! { for (id, code, genesis) in p { let code_key = Self::hash(&>::key_for(&id)).to_vec(); let head_key = Self::hash(&>::key_for(&id)).to_vec(); + // no ingress -- a chain cannot be routed to until it is live. storage.insert(code_key, code.encode()); storage.insert(head_key, genesis.encode()); @@ -116,6 +120,11 @@ decl_module! { for head in heads { let id = head.parachain_index(); >::insert(id, head.candidate.head_data.0); + + // update egress. + for &(to, root) in &head.candidate.egress_queue_roots { + >::insert((id, to), root); + } } >::put(true); @@ -144,12 +153,20 @@ decl_module! { let mut parachains = Self::active_parachains(); match parachains.binary_search(&id) { Ok(idx) => { parachains.remove(idx); } - Err(_) => {} + Err(_) => return Ok(()), } >::remove(id); >::remove(id); + + // clear all routing entries to and from other parachains. + for other in parachains.iter().cloned() { + >::remove((id, other)); + >::remove((other, id)); + } + >::put(parachains); + Ok(()) } @@ -193,17 +210,17 @@ impl Module { // shuffle for i in 0..(validator_count - 1) { - // 8 bytes of entropy used per cycle, 32 bytes entropy per hash - let offset = (i * 8 % 32) as usize; + // 4 bytes of entropy used per cycle, 32 bytes entropy per hash + let offset = (i * 4 % 32) as usize; // number of roles remaining to select from. let remaining = (validator_count - i) as usize; - // 4 * 2 32-bit ints per 256-bit seed. + // 8 32-bit ints per 256-bit seed. let val_index = u32::decode(&mut &seed[offset..offset + 4]).expect("using 4 bytes for a 32-bit quantity") as usize % remaining; - if offset == 24 { - // into the last 8 bytes - rehash to gather new entropy + if offset == 28 { + // into the last 4 bytes - rehash to gather new entropy seed = BlakeTwo256::hash(seed.as_ref()); } @@ -216,6 +233,21 @@ impl Module { } } + /// Calculate the ingress to a specific parachain. + /// + /// Yields a list of parachains being routed from, and the egress + /// queue roots to consider. + pub fn ingress(to: ParaId) -> Option> { + let active_parachains = Self::active_parachains(); + if !active_parachains.contains(&to) { return None } + + Some(active_parachains.into_iter().filter(|i| i != &to) + .filter_map(move |from| { + >::get((from, to.clone())).map(move |h| (from, h)) + }) + .collect()) + } + // check the attestations on these candidates. The candidates should have been checked // that each candidates' chain ID is valid. fn check_attestations(attested_candidates: &[AttestedCandidate]) -> Result { @@ -687,4 +719,74 @@ mod tests { ).is_err()); }); } + + #[test] + fn ingress_works() { + let parachains = vec![ + (0u32.into(), vec![], vec![]), + (1u32.into(), vec![], vec![]), + (99u32.into(), vec![1, 2, 3], vec![4, 5, 6]), + ]; + + with_externalities(&mut new_test_ext(parachains), || { + system::Module::::set_random_seed([0u8; 32].into()); + let from_a = vec![(1.into(), [1; 32].into())]; + let mut candidate_a = AttestedCandidate { + validity_votes: vec![], + candidate: CandidateReceipt { + parachain_index: 0.into(), + collator: Default::default(), + signature: Default::default(), + head_data: HeadData(vec![1, 2, 3]), + balance_uploads: vec![], + egress_queue_roots: from_a.clone(), + fees: 0, + block_data_hash: Default::default(), + } + }; + + let from_b = vec![(99.into(), [1; 32].into())]; + let mut candidate_b = AttestedCandidate { + validity_votes: vec![], + candidate: CandidateReceipt { + parachain_index: 1.into(), + collator: Default::default(), + signature: Default::default(), + head_data: HeadData(vec![1, 2, 3]), + balance_uploads: vec![], + egress_queue_roots: from_b.clone(), + fees: 0, + block_data_hash: Default::default(), + } + }; + + make_attestations(&mut candidate_a); + make_attestations(&mut candidate_b); + + assert_eq!(Parachains::ingress(ParaId::from(1)), Some(Vec::new())); + assert_eq!(Parachains::ingress(ParaId::from(99)), Some(Vec::new())); + + assert!(Parachains::dispatch( + Call::set_heads(vec![candidate_a, candidate_b]), + Origin::INHERENT, + ).is_ok()); + + assert_eq!( + Parachains::ingress(ParaId::from(1)), + Some(vec![(0.into(), [1; 32].into())]), + ); + + assert_eq!( + Parachains::ingress(ParaId::from(99)), + Some(vec![(1.into(), [1; 32].into())]), + ); + + assert_ok!(Parachains::deregister_parachain(1u32.into())); + + // after deregistering, there is no ingress to 1 and we stop routing + // from 1. + assert_eq!(Parachains::ingress(ParaId::from(1)), None); + assert_eq!(Parachains::ingress(ParaId::from(99)), Some(Vec::new())); + }); + } } diff --git a/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm b/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm index c32a6f7bffa35..264f40f644986 100644 Binary files a/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm and b/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm differ diff --git a/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm b/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm index dfad2c4181b48..9b0138624905c 100755 Binary files a/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm and b/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm differ diff --git a/service/src/lib.rs b/service/src/lib.rs index 539e21f3f837d..5e88853891339 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -216,6 +216,7 @@ construct_service_factory! { service.network(), service.on_exit(), service.client(), + executor.clone(), ); let proposer_factory = ::consensus::ProposerFactory::new( client.clone(), diff --git a/test-parachains/adder/collator/src/main.rs b/test-parachains/adder/collator/src/main.rs index 2ee3e6a6dc263..364cfa4cbee56 100644 --- a/test-parachains/adder/collator/src/main.rs +++ b/test-parachains/adder/collator/src/main.rs @@ -58,7 +58,7 @@ impl ParachainContext for AdderContext { fn produce_candidate>( &self, last_head: HeadData, - _ingress: I, + ingress: I, ) -> Result<(BlockData, HeadData), InvalidHead> { let adder_head = AdderHead::decode(&mut &last_head.0[..]) @@ -79,7 +79,11 @@ impl ParachainContext for AdderContext { add: adder_head.number % 100, }; - let next_head = ::adder::execute(adder_head.hash(), adder_head, &next_body) + let from_messages = ::adder::process_messages( + ingress.into_iter().map(|(_, msg)| msg.0) + ); + + let next_head = ::adder::execute(adder_head.hash(), adder_head, &next_body, from_messages) .expect("good execution params; qed"); let encoded_head = HeadData(next_head.encode()); diff --git a/test-parachains/adder/src/lib.rs b/test-parachains/adder/src/lib.rs index bc2e2ca02933d..2340f51a94da0 100644 --- a/test-parachains/adder/src/lib.rs +++ b/test-parachains/adder/src/lib.rs @@ -24,7 +24,7 @@ extern crate parity_codec; extern crate polkadot_parachain as parachain; extern crate tiny_keccak; -use parity_codec::Encode; +use parity_codec::{Encode, Decode}; /// Head data for this parachain. #[derive(Default, Clone, Hash, Eq, PartialEq, Encode, Decode)] @@ -56,13 +56,35 @@ pub fn hash_state(state: u64) -> [u8; 32] { ::tiny_keccak::keccak256(state.encode().as_slice()) } +#[derive(Default, Encode, Decode)] +pub struct AddMessage { + /// The amount to add based on this message. + pub amount: u64, +} + /// Start state mismatched with parent header's state hash. #[derive(Debug)] pub struct StateMismatch; +/// Process all incoming messages, yielding the amount of addition from messages. +/// +/// Ignores unknown message kinds. +pub fn process_messages(iterable: I) -> u64 + where I: IntoIterator, T: AsRef<[u8]> +{ + iterable.into_iter() + .filter_map(|data| AddMessage::decode(&mut data.as_ref())) + .fold(0u64, |a, c| a.overflowing_add(c.amount).0) +} + /// Execute a block body on top of given parent head, producing new parent head /// if valid. -pub fn execute(parent_hash: [u8; 32], parent_head: HeadData, block_data: &BlockData) -> Result { +pub fn execute( + parent_hash: [u8; 32], + parent_head: HeadData, + block_data: &BlockData, + from_messages: u64, +) -> Result { debug_assert_eq!(parent_hash, parent_head.hash()); if hash_state(block_data.state) != parent_head.post_state { @@ -70,6 +92,7 @@ pub fn execute(parent_hash: [u8; 32], parent_head: HeadData, block_data: &BlockD } let new_state = block_data.state.overflowing_add(block_data.add).0; + let new_state = new_state.overflowing_add(from_messages).0; Ok(HeadData { number: parent_head.number + 1, diff --git a/test-parachains/adder/wasm/Cargo.toml b/test-parachains/adder/wasm/Cargo.toml index 55e96b6b66565..d450d06302907 100644 --- a/test-parachains/adder/wasm/Cargo.toml +++ b/test-parachains/adder/wasm/Cargo.toml @@ -6,9 +6,8 @@ authors = ["Parity Technologies "] [dependencies] adder = { path = ".." } polkadot-parachain = { path = "../../../parachain", default-features = false, features = ["wasm-api"] } -wee_alloc = { version = "0.4.1" } -pwasm-libc = { version = "0.2" } tiny-keccak = "1.4" +dlmalloc = { version = "0.1.2", features = ["global"] } [lib] crate-type = ["cdylib"] diff --git a/test-parachains/adder/wasm/src/lib.rs b/test-parachains/adder/wasm/src/lib.rs index 3a687a748afe7..87db11c6f0a6c 100644 --- a/test-parachains/adder/wasm/src/lib.rs +++ b/test-parachains/adder/wasm/src/lib.rs @@ -23,13 +23,13 @@ )] extern crate alloc; -extern crate pwasm_libc; extern crate adder; extern crate polkadot_parachain as parachain; extern crate tiny_keccak; +extern crate dlmalloc; #[global_allocator] -static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT; +static ALLOC: dlmalloc::GlobalDlmalloc = dlmalloc::GlobalDlmalloc; use core::{intrinsics, panic}; use parachain::ValidationResult; @@ -63,7 +63,13 @@ pub extern fn validate(offset: usize, len: usize) -> usize { let parent_hash = ::tiny_keccak::keccak256(¶ms.parent_head[..]); - match ::adder::execute(parent_hash, parent_head, &block_data) { + // we also add based on incoming data from messages. ignoring unknown message + // kinds. + let from_messages = ::adder::process_messages( + params.ingress.iter().map(|incoming| &incoming.data[..]) + ); + + match ::adder::execute(parent_hash, parent_head, &block_data, from_messages) { Ok(new_head) => parachain::wasm_api::write_result( ValidationResult { head_data: new_head.encode() } ), diff --git a/test-parachains/build.sh b/test-parachains/build.sh index d7dced137993b..06183fa2e5d8d 100755 --- a/test-parachains/build.sh +++ b/test-parachains/build.sh @@ -2,7 +2,7 @@ set -e # Make LLD produce a binary that imports memory from the outside environment. -export RUSTFLAGS="-C link-arg=--import-memory,--export-table -C lto=fat -C panic=abort" +export RUSTFLAGS="-C link-arg=--import-memory -C link-arg=--export-table -C panic=abort" for i in adder do