Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Make sure we send the validator key to collators on status #968

Merged
merged 1 commit into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions network/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ enum BackgroundToWorkerMsg {
}

/// Operations that a handle to an underlying network service should provide.
trait NetworkServiceOps: Send + Sync {
pub trait NetworkServiceOps: Send + Sync {
/// Report the peer as having a particular positive or negative value.
fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange);

Expand Down Expand Up @@ -193,10 +193,18 @@ impl GossipOps for RegisteredMessageValidator {
}

/// An async handle to the network service.
#[derive(Clone)]
pub struct Service {
pub struct Service<N = PolkadotNetworkService> {
sender: mpsc::Sender<ServiceToWorkerMsg>,
network_service: Arc<dyn NetworkServiceOps>,
network_service: Arc<N>,
}

impl<N> Clone for Service<N> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
network_service: self.network_service.clone(),
}
}
}

/// Registers the protocol.
Expand All @@ -209,7 +217,7 @@ pub fn start<C, Api, SP>(
chain_context: C,
api: Arc<Api>,
executor: SP,
) -> Result<Service, futures::task::SpawnError> where
) -> Result<Service<PolkadotNetworkService>, futures::task::SpawnError> where
C: ChainContext + 'static,
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
Expand Down Expand Up @@ -292,14 +300,14 @@ pub fn start<C, Api, SP>(
}

/// The Polkadot protocol status message.
#[derive(Debug, Encode, Decode)]
#[derive(Debug, Encode, Decode, PartialEq)]
pub struct Status {
version: u32, // protocol version.
collating_for: Option<(CollatorId, ParaId)>,
}

/// Polkadot-specific messages from peer to peer.
#[derive(Debug, Encode, Decode)]
#[derive(Debug, Encode, Decode, PartialEq)]
pub enum Message {
/// Exchange status with a peer. This should be the first message sent.
#[codec(index = "0")]
Expand Down Expand Up @@ -451,6 +459,11 @@ impl RecentValidatorIds {
fn as_slice(&self) -> &[ValidatorId] {
&*self.inner
}

/// Returns the last inserted session key.
fn latest(&self) -> Option<&ValidatorId> {
self.inner.last()
}
}

struct ProtocolHandler {
Expand Down Expand Up @@ -582,7 +595,19 @@ impl ProtocolHandler {
let role = self.collators
.on_new_collator(collator_id, para_id, remote.clone());
let service = &self.service;
let send_key = peer.should_send_key();

if let Some(c_state) = peer.collator_state_mut() {
if send_key {
if let Some(key) = self.local_keys.latest() {
c_state.send_key(key.clone(), |msg| service.write_notification(
remote.clone(),
POLKADOT_ENGINE_ID,
msg.encode(),
));
}
}

c_state.set_role(role, |msg| service.write_notification(
remote.clone(),
POLKADOT_ENGINE_ID,
Expand Down Expand Up @@ -1323,7 +1348,7 @@ struct RouterInner {
sender: mpsc::Sender<ServiceToWorkerMsg>,
}

impl Service {
impl<N: NetworkServiceOps> Service<N> {
/// Register an availablility-store that the network can query.
pub fn register_availability_store(&self, store: av_store::Store) {
let _ = self.sender.clone()
Expand Down Expand Up @@ -1373,7 +1398,7 @@ impl Service {
}
}

impl ParachainNetwork for Service {
impl<N> ParachainNetwork for Service<N> {
type Error = mpsc::SendError;
type TableRouter = Router;
type BuildTableRouter = Pin<Box<dyn Future<Output=Result<Router,Self::Error>> + Send>>;
Expand Down Expand Up @@ -1403,7 +1428,7 @@ impl ParachainNetwork for Service {
}
}

impl Collators for Service {
impl<N> Collators for Service<N> {
type Error = future::Either<mpsc::SendError, oneshot::Canceled>;
type Collation = Pin<Box<dyn Future<Output = Result<Collation, Self::Error>> + Send>>;

Expand All @@ -1425,7 +1450,7 @@ impl Collators for Service {
}
}

impl av_store::ErasureNetworking for Service {
impl<N> av_store::ErasureNetworking for Service<N> {
type Error = future::Either<mpsc::SendError, oneshot::Canceled>;

fn fetch_erasure_chunk(&self, candidate_hash: &Hash, index: u32)
Expand Down
38 changes: 33 additions & 5 deletions network/src/protocol/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;

#[derive(Default)]
struct MockNetworkOps {
pub struct MockNetworkOps {
recorded: Mutex<Recorded>,
}

Expand Down Expand Up @@ -188,7 +188,7 @@ sp_api::mock_impl_runtime_apis! {
}
}

impl super::Service {
impl super::Service<MockNetworkOps> {
async fn connect_peer(&mut self, peer: PeerId, roles: Roles) {
self.sender.send(ServiceToWorkerMsg::PeerConnected(peer, roles)).await.unwrap();
}
Expand Down Expand Up @@ -222,7 +222,7 @@ impl super::Service {
}

fn test_setup(config: Config) -> (
Service,
Service<MockNetworkOps>,
MockGossip,
LocalPool,
impl Future<Output = ()> + 'static,
Expand Down Expand Up @@ -264,7 +264,7 @@ fn worker_task_shuts_down_when_sender_dropped() {
/// is handled. This helper functions checks multiple times that the given instance is dropped. Even
/// if the first round fails, the second one should be successful as the consensus instance drop
/// should be already handled this time.
fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) {
fn wait_for_instance_drop(service: &mut Service<MockNetworkOps>, pool: &mut LocalPool, instance: Hash) {
let mut try_counter = 0;
let max_tries = 3;

Expand Down Expand Up @@ -363,7 +363,6 @@ fn collation_is_received_with_dropped_router() {
})));
}


#[test]
fn validator_peer_cleaned_up() {
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
Expand Down Expand Up @@ -575,3 +574,32 @@ fn fetches_pov_block_from_gossip() {

pool.run_until(test_work).unwrap();
}

#[test]
fn validator_sends_key_to_collator_on_status() {
let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });

let peer = PeerId::random();
let peer_clone = peer.clone();
let validator_key = Sr25519Keyring::Alice.pair();
let validator_id = ValidatorId::from(validator_key.public());
let validator_id_clone = validator_id.clone();
let collator_id = CollatorId::from(Sr25519Keyring::Bob.public());
let para_id = ParaId::from(100);
let mut service_clone = service.clone();

pool.spawner().spawn_local(worker_task).unwrap();
pool.run_until(async move {
service_clone.synchronize(move |proto| { proto.local_keys.insert(validator_id_clone); }).await;
service_clone.connect_peer(peer_clone.clone(), Roles::AUTHORITY).await;
service_clone.peer_message(peer_clone.clone(), Message::Status(Status {
version: VERSION,
collating_for: Some((collator_id, para_id)),
})).await;
});

let expected_msg = Message::ValidatorId(validator_id.clone());
assert!(service.network_service.recorded.lock().notifications.iter().any(|(p, notification)| {
peer == *p && *notification == expected_msg
}));
}
8 changes: 3 additions & 5 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::Duration;
use polkadot_primitives::{parachain, Hash, BlockId, AccountId, Nonce, Balance};
#[cfg(feature = "full-node")]
use polkadot_network::{legacy::gossip::Known, protocol as network_protocol};
use service::{error::{Error as ServiceError}, ServiceBuilder};
use service::{error::Error as ServiceError, ServiceBuilder};
use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use inherents::InherentDataProviders;
use sc_executor::native_executor_instance;
Expand Down Expand Up @@ -103,11 +103,9 @@ where
<Self as sp_api::ApiExt<Block>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
{}

pub trait RuntimeExtrinsic: codec::Codec + Send + Sync + 'static
{}
pub trait RuntimeExtrinsic: codec::Codec + Send + Sync + 'static {}

impl<E> RuntimeExtrinsic for E where E: codec::Codec + Send + Sync + 'static
{}
impl<E> RuntimeExtrinsic for E where E: codec::Codec + Send + Sync + 'static {}

/// Can be called for a `Configuration` to check if it is a configuration for the `Kusama` network.
pub trait IsKusama {
Expand Down