Skip to content

Commit

Permalink
Authorship works again (paritytech#50)
Browse files Browse the repository at this point in the history
* provide through inherent-data when authoring

* remove unneeded codec round-trip in proposer

* refactor polkadot-consensus service architecture

* integrate block authorship into polkadot service

* remove unused extern substrate-network crate in service

* write wrapper for unifying errors in consensus proposer

* extend wrapper further

* switch temporarily to macro-changing branch

* runtime compiles

* implement `inherent_extrinsics` for runtime

* block authorship works

* add GRANDPA to polkadot runtime

* get everything compiling

* use substrate master branch again

* remove some unneeded params

* update WASM

* parse only extrinsics when pruning availability store

* update recent deps

* runtime almost compiles

* need to expose trait type in build : I had to put phantomdata manually.

* finish updating authorship to latest GRANDPA and Aura

* fix tests

* update wasm
  • Loading branch information
rphmeier authored Dec 11, 2018
1 parent 6430a00 commit 13b58b1
Show file tree
Hide file tree
Showing 20 changed files with 1,224 additions and 927 deletions.
724 changes: 468 additions & 256 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions availability-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ parking_lot = "0.4"
log = "0.3"
parity-codec = "2.1"
substrate-primitives = { git = "https://github.com/paritytech/substrate" }
kvdb = { git = "https://github.com/paritytech/parity-common.git" }
kvdb-rocksdb = { git = "https://github.com/paritytech/parity-common.git" }
kvdb-memorydb = { git = "https://github.com/paritytech/parity-common.git" }
kvdb = { git = "https://github.com/paritytech/parity-common", rev="616b40150ded71f57f650067fcbc5c99d7c343e6" }
kvdb-rocksdb = { git = "https://github.com/paritytech/parity-common", rev="616b40150ded71f57f650067fcbc5c99d7c343e6" }
kvdb-memorydb = { git = "https://github.com/paritytech/parity-common", rev="616b40150ded71f57f650067fcbc5c99d7c343e6" }
2 changes: 1 addition & 1 deletion availability-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn extrinsic_key(relay_parent: &Hash, candidate_hash: &Hash) -> Vec<u8> {
/// Handle to the availability store.
#[derive(Clone)]
pub struct Store {
inner: Arc<KeyValueDB>,
inner: Arc<dyn KeyValueDB>,
}

impl Store {
Expand Down
2 changes: 1 addition & 1 deletion cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub fn run<I, T, W>(args: I, worker: W, version: cli::VersionInfo) -> error::Res

let (spec, mut config) = cli::parse_matches::<service::Factory, _>(load_spec, version, "parity-polkadot", &matches)?;

match cli::execute_default::<service::Factory, _,>(spec, worker, &matches)? {
match cli::execute_default::<service::Factory, _,>(spec, worker, &matches, &config)? {
cli::Action::ExecutedInternally => (),
cli::Action::RunService(worker) => {
info!("Parity ·:· Polkadot");
Expand Down
1 change: 0 additions & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ polkadot-parachain = { path = "../parachain" }
polkadot-primitives = { path = "../primitives" }
polkadot-runtime = { path = "../runtime" }
polkadot-statement-table = { path = "../statement-table" }
substrate-consensus-aura = { git = "https://github.com/paritytech/substrate" }
substrate-finality-grandpa = { git = "https://github.com/paritytech/substrate" }
substrate-consensus-common = { git = "https://github.com/paritytech/substrate" }
substrate-primitives = { git = "https://github.com/paritytech/substrate" }
Expand Down
216 changes: 216 additions & 0 deletions consensus/src/attestation_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Attestation service.

/// Attestation service. A long running service that creates and manages parachain attestation
/// instances.
///
/// This uses a handle to an underlying thread pool to dispatch heavy work
/// such as candidate verification while performing event-driven work
/// on a local event loop.

use std::thread;
use std::time::{Duration, Instant};
use std::sync::Arc;

use client::{BlockchainEvents, ChainHead, BlockBody};
use client::block_builder::api::BlockBuilder;
use client::blockchain::HeaderBackend;
use client::runtime_api::Core;
use primitives::ed25519;
use futures::prelude::*;
use polkadot_primitives::{Block, BlockId, InherentData};
use polkadot_primitives::parachain::ParachainHost;
use extrinsic_store::Store as ExtrinsicStore;
use runtime_primitives::traits::ProvideRuntimeApi;

use tokio::runtime::TaskExecutor;
use tokio::runtime::current_thread::Runtime as LocalRuntime;
use tokio::timer::Interval;

use super::{Network, Collators};

// creates a task to prune redundant entries in availability store upon block finalization
//
// NOTE: this will need to be changed to finality notification rather than
// block import notifications when the consensus switches to non-instant finality.
fn prune_unneeded_availability<P>(client: Arc<P>, extrinsic_store: ExtrinsicStore)
-> impl Future<Item=(),Error=()> + Send
where P: Send + Sync + BlockchainEvents<Block> + BlockBody<Block> + 'static
{
use codec::{Encode, Decode};
use polkadot_primitives::BlockId;

enum NotifyError {
BodyFetch(::client::error::Error),
}

impl NotifyError {
fn log(&self, hash: &::polkadot_primitives::Hash) {
match *self {
NotifyError::BodyFetch(ref err) => warn!("Failed to fetch block body for imported block {:?}: {:?}", hash, err),
}
}
}

client.finality_notification_stream()
.for_each(move |notification| {
use polkadot_runtime::{Call, ParachainsCall, UncheckedExtrinsic as RuntimeExtrinsic};

let hash = notification.hash;
let parent_hash = notification.header.parent_hash;
let extrinsics = client.block_body(&BlockId::hash(hash))
.map_err(NotifyError::BodyFetch);

let extrinsics = match extrinsics {
Ok(r) => r,
Err(e) => { e.log(&hash); return Ok(()) }
};

let candidate_hashes = match extrinsics
.iter()
.filter_map(|ex| RuntimeExtrinsic::decode(&mut ex.encode().as_slice()))
.filter_map(|ex| match ex.function {
Call::Parachains(ParachainsCall::set_heads(ref heads)) =>
Some(heads.iter().map(|c| c.candidate.hash()).collect()),
_ => None,
})
.next()
{
Some(x) => x,
None => return Ok(()),
};

if let Err(e) = extrinsic_store.candidates_finalized(parent_hash, candidate_hashes) {
warn!(target: "consensus", "Failed to prune unneeded available data: {:?}", e);
}

Ok(())
})
}

/// Parachain candidate attestation service handle.
pub(crate) struct ServiceHandle {
thread: Option<thread::JoinHandle<()>>,
exit_signal: Option<::exit_future::Signal>,
}

/// Create and start a new instance of the attestation service.
pub(crate) fn start<C, N, P>(
client: Arc<P>,
parachain_consensus: Arc<::ParachainConsensus<C, N, P>>,
thread_pool: TaskExecutor,
key: Arc<ed25519::Pair>,
extrinsic_store: ExtrinsicStore,
) -> ServiceHandle
where
C: Collators + Send + Sync + 'static,
<C::Collation as IntoFuture>::Future: Send + 'static,
P: BlockchainEvents<Block> + ChainHead<Block> + BlockBody<Block>,
P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> + Core<Block> + BlockBuilder<Block, InherentData>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
{
const TIMER_DELAY: Duration = Duration::from_secs(5);
const TIMER_INTERVAL: Duration = Duration::from_secs(30);

let (signal, exit) = ::exit_future::signal();
let thread = thread::spawn(move || {
let mut runtime = LocalRuntime::new().expect("Could not create local runtime");
let notifications = {
let client = client.clone();
let consensus = parachain_consensus.clone();
let key = key.clone();

client.import_notification_stream().for_each(move |notification| {
let parent_hash = notification.hash;
if notification.is_new_best {
let res = client
.runtime_api()
.authorities(&BlockId::hash(parent_hash))
.map_err(Into::into)
.and_then(|authorities| {
consensus.get_or_instantiate(
parent_hash,
&authorities,
key.clone(),
)
});

if let Err(e) = res {
warn!("Unable to start parachain consensus on top of {:?}: {}",
parent_hash, e);
}
}
Ok(())
})
};

let prune_old_sessions = {
let client = client.clone();
let interval = Interval::new(
Instant::now() + TIMER_DELAY,
TIMER_INTERVAL,
);

interval
.for_each(move |_| match client.leaves() {
Ok(leaves) => {
parachain_consensus.retain(|h| leaves.contains(h));
Ok(())
}
Err(e) => {
warn!("Error fetching leaves from client: {:?}", e);
Ok(())
}
})
.map_err(|e| warn!("Timer error {:?}", e))
};

runtime.spawn(notifications);
thread_pool.spawn(prune_old_sessions);

let prune_available = prune_unneeded_availability(client, extrinsic_store)
.select(exit.clone())
.then(|_| Ok(()));

// spawn this on the tokio executor since it's fine on a thread pool.
thread_pool.spawn(prune_available);

if let Err(e) = runtime.block_on(exit) {
debug!("BFT event loop error {:?}", e);
}
});

ServiceHandle {
thread: Some(thread),
exit_signal: Some(signal),
}
}

impl Drop for ServiceHandle {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
signal.fire();
}

if let Some(thread) = self.thread.take() {
thread.join().expect("The service thread has panicked");
}
}
}
3 changes: 0 additions & 3 deletions consensus/src/dynamic_inclusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ impl DynamicInclusion {
Some(now + until)
}
}

/// Get the start instant.
pub fn started_at(&self) -> Instant { self.start }
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 13b58b1

Please sign in to comment.