Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
slimify
Browse files Browse the repository at this point in the history
  • Loading branch information
drahnr committed Jul 14, 2020
1 parent 9012e2c commit 9a7cb2d
Showing 1 changed file with 46 additions and 73 deletions.
119 changes: 46 additions & 73 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@

//! The bitfield distribution subsystem spreading @todo .
use codec::{Codec, Decode, Encode};
use codec::{Decode, Encode};
use futures::{
channel::oneshot,
future::{abortable, AbortHandle, Abortable},
stream::{FuturesUnordered, Stream, StreamExt},
Future, FutureExt,
};

use node_primitives::{ProtocolId, SignedFullStatement, View};
use polkadot_network::protocol::Message;
use node_primitives::{ProtocolId, View};

use polkadot_node_subsystem::messages::*;
use polkadot_node_subsystem::{
FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
Expand All @@ -43,13 +43,12 @@ const COST_SIGNATURE_INVALID: ReputationChange =
ReputationChange::new(-100, "Bitfield signature invalid");
const COST_MISSING_PEER_SESSION_KEY: ReputationChange =
ReputationChange::new(-133, "Missing peer session key");
const COST_MULTIPLE_BITFIELDS_FROM_PEER: ReputationChange =
ReputationChange::new(-22, "Received more than once bitfield from peer");
const COST_NOT_INTERESTED: ReputationChange =
ReputationChange::new(-51, "Not intersted in that parent hash");
const COST_MESSAGE_NOT_DECODABLE: ReputationChange =
ReputationChange::new(-100, "Not intersted in that parent hash");


#[derive(Default, Clone)]
struct Tracker {
// track all active peers and their views
Expand All @@ -59,16 +58,13 @@ struct Tracker {
// our current view
view: View,

// signing context for a particular relay_parent
jobs: HashMap<Hash, JobData>,

// set of validators for a particular relay_parent
per_job: HashMap<Hash, JobData>,
per_relay_parent: HashMap<Hash, PerRelayParentData>,
}

/// Data for each relay parent
#[derive(Debug, Clone, Default)]
struct JobData {
struct PerRelayParentData {
// set of validators which already sent a message
validator_bitset_received: HashSet<ValidatorId>,

Expand All @@ -80,7 +76,7 @@ struct JobData {

// set of validators for a particular relay_parent and the number of messages
// received authored by them
received_bitsets_per_validator: HashSet<ValidatorId, usize>,
one_per_validator: HashSet<ValidatorId>,
}

fn network_update_message(n: NetworkBridgeEvent) -> AllMessages {
Expand All @@ -104,8 +100,6 @@ impl BitfieldDistribution {

// set of active heads the overseer told us to work on with the connected
// tasks abort handles
let mut active_jobs = HashMap::<Hash, AbortHandle>::new();
let mut futurama = HashMap::<Hash, Pin<Box<dyn Future<Output = ()>>>>::new();
let mut tracker = Tracker::default();
loop {
{
Expand All @@ -126,6 +120,8 @@ impl BitfieldDistribution {
relay_parent: hash,
signed_availability,
};
// @todo do we also make sure to not send it twice if we are the source
// @todo this also apply to ourself?
distribute(ctx.clone(), &mut tracker, msg).await?;
}
BitfieldDistributionMessage::NetworkBridgeUpdate(event) => {
Expand All @@ -137,9 +133,9 @@ impl BitfieldDistribution {
let (validator_set, signing_context) =
query_basics(ctx.clone(), relay_parent).await?;

let _ = tracker.per_job.insert(
let _ = tracker.per_relay_parent.insert(
relay_parent,
JobData {
PerRelayParentData {
signing_context,
validator_set: validator_set,
..Default::default()
Expand All @@ -149,30 +145,10 @@ impl BitfieldDistribution {
// active_jobs.insert(relay_parent.clone(), abort_handle);
}
FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => {
// if let Some(abort_handle) = active_jobs.remove(&relay_parent) {
// let _ = abort_handle.abort();
// }
// let _ = futurama.remove(&relay_parent);
let _ = tracker.per_job.remove(&relay_parent);
let _ = tracker.per_relay_parent.remove(&relay_parent);
}
FromOverseer::Signal(OverseerSignal::Conclude) => {
tracker.per_job.clear();
// let unordered = futurama
// .drain()
// .map(|(_relay_parent, future)| future)
// .zip(
// active_jobs
// .drain()
// .map(|(_relay_parent, abort_handle)| abort_handle),
// )
// .map(|(future, abort_handle)| {
// abort_handle.abort();
// // TODO pipe to cleanup state
// future
// })
// .collect::<FuturesUnordered<_>>();

// let _ = async move { unordered.into_future().await }.await;
tracker.per_relay_parent.clear();
return Ok(());
}
}
Expand Down Expand Up @@ -207,6 +183,7 @@ async fn distribute<Context>(
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{

let BitfieldGossipMessage {
relay_parent,
signed_availability,
Expand Down Expand Up @@ -236,48 +213,52 @@ where

/// Handle an incoming message from a peer
async fn process_incoming_peer_message<Context>(
mut ctx: Context,
ctx: Context,
tracker: &mut Tracker,
peerid: PeerId,
message: BitfieldGossipMessage,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{
let peer_view = if let Some(peer_view) = tracker.peer_views.get(&peerid) {
peer_view
} else {
// we don't care about this, not part of our view
if !tracker.view.contains(&message.relay_parent) {
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
};
}

let job_data = if let Some(job_data) = tracker.per_job.get(&message.relay_parent) {
// Ignore anything the overseer did not tell this subsystem to work on
let mut job_data = tracker.per_relay_parent.get_mut(&message.relay_parent);
let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
job_data
} else {
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
};

// @todo should we only distribute availability messages to peer if they are relevant to us
// or is the only discriminator if the peer cares about it?
if !peer_view.contains(&message.relay_parent) {
// we don't care about this, the other side should have known better
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
}

let validator_set = &job_data.validator_set;
if validator_set.len() == 0 {
return modify_reputiation(ctx, peerid, COST_MISSING_PEER_SESSION_KEY).await;
}

// check all validators that could have signed this message
if validator_set.iter().find(|validator| {
// @todo there must be a better way figuring this out cheaply
let signing_context = job_data.signing_context.clone();
if let Some(validator) = validator_set.iter().find(|validator| {
message
.signed_availability
.check_signature(&job_data.signing_context, validator)
.check_signature(&signing_context, validator)
.is_ok()
}).is_none() {
}) {
let one_per_validator = &mut (job_data.one_per_validator);
// only distribute a message of a validator once
if one_per_validator.contains(validator) {
return Ok(())
}
one_per_validator.insert(validator.clone());
} else {
return modify_reputiation(ctx, peerid, COST_SIGNATURE_INVALID).await;
}

// passed all conditions, distribute!
distribute(ctx, tracker, message).await?;

Ok(())
Expand All @@ -292,15 +273,13 @@ pub struct BitfieldGossipMessage {

/// Deal with network bridge updates and track what needs to be tracked
async fn handle_network_msg<Context>(
mut ctx: Context,
ctx: Context,
tracker: &mut Tracker,
bridge_message: NetworkBridgeEvent,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{
let peer_views = &mut tracker.peer_views;
let per_job = &mut tracker.per_job;
let ego = &((*tracker).view);
match bridge_message {
NetworkBridgeEvent::PeerConnected(peerid, _role) => {
Expand All @@ -318,29 +297,18 @@ where
.and_modify(|val| *val = view);
}
NetworkBridgeEvent::OurViewChange(view) => {
let ego = ego.clone();
let old_view = std::mem::replace(&mut (tracker.view), view);
tracker
.per_job
.retain(move |hash, _job_data| ego.0.contains(hash));

for new in tracker.view.difference(&old_view) {
if !tracker.per_job.contains_key(&new) {
log::warn!("Active head running that's not active anymore, go catch it")
//@todo rephrase
//@todo should we get rid of that right here
if !tracker.per_relay_parent.contains_key(&new) {
log::warn!("Our view contains {} but the overseer never told use we should work on this", &new);
}
}
}
NetworkBridgeEvent::PeerMessage(remote, mut bytes) => {
NetworkBridgeEvent::PeerMessage(remote, bytes) => {
log::info!("Got a peer message from {:?}", &remote);
if let Ok(gossiped_bitfield) = BitfieldGossipMessage::decode(&mut (bytes.as_slice())) {
let (future, _abort_handle) = abortable(process_incoming_peer_message(ctx, tracker, remote, gossiped_bitfield));
future.await;
// tracker.active_jobs.insert(&gossiped_bitfield.relay_parent, abort_handle);
// let _future = ctx.spawn(Box::pin(async move {
// future.map(|_| ()).await
// }));
process_incoming_peer_message(ctx, tracker, remote, gossiped_bitfield).await;
} else {
return modify_reputiation(ctx, remote, COST_MESSAGE_NOT_DECODABLE).await;
}
Expand Down Expand Up @@ -389,9 +357,14 @@ where
mod test {
use super::*;

fn generate_valid_message() -> AllMessages {}
fn generate_valid_message() -> AllMessages {
// AllMessages::BitfieldDistribution(BitfieldDistributionMessage::DistributeBitfield())
unimplemented!()
}

fn generate_invalid_message() -> AllMessages {}
fn generate_invalid_message() -> AllMessages {
unimplemented!()
}

#[test]
fn game_changer() {
Expand Down

0 comments on commit 9a7cb2d

Please sign in to comment.