Skip to content

Commit

Permalink
Interchain message-passing (paritytech#117)
Browse files Browse the repository at this point in the history
* compute ingress and routing in polkadot runtime

* extract parent candidates from block when beginning consensus

* fetch incoming messages when validating

* fix consensus tests

* parachain wasm execution uses messages

* update parachain tests to check if messages are executed

* abstract out network service to make room for network tests

* skeleton for incoming data fetch

* collate ingress from consensus-gossip

* keep track of validated candidates in the shared-table

* add some shared_table tests for new behavior

* broadcast egress messages on gossip

* test compute_ingress

* move network tests to module folder

* dummy network for consensus-network tests

* make consensus network generic over executor

* test egress broadcast and ingress fetch

* fix test compilation

* address some grumbles

* address grumbles and fix parachain shuffle

* remove broadcast parameter from consensus network trait
  • Loading branch information
rphmeier committed Feb 19, 2019
1 parent c30e000 commit 66c9580
Show file tree
Hide file tree
Showing 26 changed files with 1,732 additions and 367 deletions.
30 changes: 18 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 35 additions & 41 deletions consensus/src/attestation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,41 @@ use std::thread;
use std::time::{Duration, Instant};
use std::sync::Arc;

use client::{BlockchainEvents, ChainHead, BlockBody};
use client::{error::Result as ClientResult, BlockchainEvents, ChainHead, BlockBody};
use client::block_builder::api::BlockBuilder;
use client::blockchain::HeaderBackend;
use client::runtime_api::Core;
use primitives::ed25519;
use futures::prelude::*;
use polkadot_primitives::{Block, BlockId};
use polkadot_primitives::parachain::ParachainHost;
use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost};
use extrinsic_store::Store as ExtrinsicStore;
use runtime_primitives::traits::ProvideRuntimeApi;
use runtime_primitives::traits::{ProvideRuntimeApi, Header as HeaderT};

use tokio::runtime::TaskExecutor;
use tokio::runtime::current_thread::Runtime as LocalRuntime;
use tokio::timer::Interval;

use super::{Network, Collators};
use super::{Network, Collators, TableRouter};

/// Gets a list of the candidates in a block.
pub(crate) fn fetch_candidates<P: BlockBody<Block>>(client: &P, block: &BlockId)
-> ClientResult<Option<impl Iterator<Item=CandidateReceipt>>>
{
use codec::{Encode, Decode};
use polkadot_runtime::{Call, ParachainsCall, UncheckedExtrinsic as RuntimeExtrinsic};

let extrinsics = client.block_body(block)?;
Ok(extrinsics
.into_iter()
.filter_map(|ex| RuntimeExtrinsic::decode(&mut ex.encode().as_slice()))
.filter_map(|ex| match ex.function {
Call::Parachains(ParachainsCall::set_heads(heads)) =>
Some(heads.into_iter().map(|c| c.candidate)),
_ => None,
})
.next())
}

// creates a task to prune redundant entries in availability store upon block finalization
//
Expand All @@ -52,47 +71,20 @@ fn prune_unneeded_availability<P>(client: Arc<P>, extrinsic_store: ExtrinsicStor
-> impl Future<Item=(),Error=()> + Send
where P: Send + Sync + BlockchainEvents<Block> + BlockBody<Block> + 'static
{
use codec::{Encode, Decode};
use polkadot_primitives::BlockId;

enum NotifyError {
BodyFetch(::client::error::Error),
}

impl NotifyError {
fn log(&self, hash: &::polkadot_primitives::Hash) {
match *self {
NotifyError::BodyFetch(ref err) => warn!("Failed to fetch block body for imported block {:?}: {:?}", hash, err),
}
}
}

client.finality_notification_stream()
.for_each(move |notification| {
use polkadot_runtime::{Call, ParachainsCall, UncheckedExtrinsic as RuntimeExtrinsic};

let hash = notification.hash;
let parent_hash = notification.header.parent_hash;
let extrinsics = client.block_body(&BlockId::hash(hash))
.map_err(NotifyError::BodyFetch);

let extrinsics = match extrinsics {
Ok(r) => r,
Err(e) => { e.log(&hash); return Ok(()) }
};

let candidate_hashes = match extrinsics
.iter()
.filter_map(|ex| RuntimeExtrinsic::decode(&mut ex.encode().as_slice()))
.filter_map(|ex| match ex.function {
Call::Parachains(ParachainsCall::set_heads(ref heads)) =>
Some(heads.iter().map(|c| c.candidate.hash()).collect()),
_ => None,
})
.next()
{
Some(x) => x,
None => return Ok(()),
let candidate_hashes = match fetch_candidates(&*client, &BlockId::hash(hash)) {
Ok(Some(candidates)) => candidates.map(|c| c.hash()).collect(),
Ok(None) => {
warn!("Could not extract candidates from block body of imported block {:?}", hash);
return Ok(())
}
Err(e) => {
warn!("Failed to fetch block body for imported block {:?}: {:?}", hash, e);
return Ok(())
}
};

if let Err(e) = extrinsic_store.candidates_finalized(parent_hash, candidate_hashes) {
Expand Down Expand Up @@ -125,6 +117,7 @@ pub(crate) fn start<C, N, P>(
P::Api: ParachainHost<Block> + Core<Block> + BlockBuilder<Block>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
<<N::TableRouter as TableRouter>::FetchIncoming as IntoFuture>::Future: Send + 'static,
{
const TIMER_DELAY: Duration = Duration::from_secs(5);
const TIMER_INTERVAL: Duration = Duration::from_secs(30);
Expand All @@ -148,6 +141,7 @@ pub(crate) fn start<C, N, P>(
.and_then(|authorities| {
consensus.get_or_instantiate(
parent_hash,
notification.header.parent_hash().clone(),
&authorities,
key.clone(),
)
Expand Down
Loading

0 comments on commit 66c9580

Please sign in to comment.