Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Only send one collation per relay parent at a time to validators #3360

Merged
3 commits merged into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 112 additions & 22 deletions node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,27 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{HashMap, HashSet};
use std::{collections::{HashMap, HashSet, VecDeque}, pin::Pin};

use futures::{FutureExt, channel::oneshot};
use futures::{FutureExt, StreamExt, channel::oneshot, stream::FuturesUnordered, select, Future};
use sp_core::Pair;

use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, Hash, Id as ParaId};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, Hash,
Id as ParaId,
};
use polkadot_subsystem::{
FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, jaeger,
messages::{
AllMessages, CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeMessage,
},
};
use polkadot_node_network_protocol::{
OurView, PeerId, View, peer_set::PeerSet,
OurView, PeerId, UnifiedReputationChange as Rep, View, peer_set::PeerSet,
request_response::{
IncomingRequest,
v1::{CollationFetchingRequest, CollationFetchingResponse},
IncomingRequest, request::OutgoingResponse, v1::{CollationFetchingRequest, CollationFetchingResponse}
},
v1 as protocol_v1,
UnifiedReputationChange as Rep,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
Expand All @@ -59,6 +60,12 @@ impl Metrics {
}
}

fn on_collation_sent_requested(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_send_requested.inc();
}
}

fn on_collation_sent(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_sent.inc();
Expand All @@ -75,6 +82,7 @@ impl Metrics {
struct MetricsInner {
advertisements_made: prometheus::Counter<prometheus::U64>,
collations_sent: prometheus::Counter<prometheus::U64>,
collations_send_requested: prometheus::Counter<prometheus::U64>,
process_msg: prometheus::Histogram,
}

Expand All @@ -90,6 +98,13 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
collations_send_requested: prometheus::register(
prometheus::Counter::new(
"parachain_collations_sent_requested_total",
"A number of collations requested to be sent to validators.",
)?,
registry,
)?,
collations_sent: prometheus::register(
prometheus::Counter::new(
"parachain_collations_sent_total",
Expand Down Expand Up @@ -185,6 +200,17 @@ struct Collation {
status: CollationStatus,
}

/// Stores the state for waiting collation fetches.
#[derive(Default)]
struct WaitingCollationFetches {
/// Is there currently a collation getting fetched?
collation_fetch_active: bool,
/// The collation fetches waiting to be fulfilled.
waiting: VecDeque<IncomingRequest<CollationFetchingRequest>>,
}

type ActiveCollationFetches = FuturesUnordered<Pin<Box<dyn Future<Output = Hash> + Send + 'static>>>;

struct State {
/// Our network peer id.
local_peer_id: PeerId,
Expand Down Expand Up @@ -217,11 +243,23 @@ struct State {
/// Our validator groups per active leaf.
our_validators_groups: HashMap<Hash, ValidatorGroup>,

/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s by `PeerConnected` events.
/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s
/// by `PeerConnected` events.
peer_ids: HashMap<PeerId, AuthorityDiscoveryId>,

/// Metrics.
metrics: Metrics,

/// All collation fetching requests that are still waiting to be answered.
///
/// They are stored per relay parent, when our view changes and the relay parent moves out, we will cancel the fetch
/// request.
waiting_collation_fetches: HashMap<Hash, WaitingCollationFetches>,

/// Active collation fetches.
///
/// Each future returns the relay parent of the finished collation fetch.
active_collation_fetches: ActiveCollationFetches,
}

impl State {
Expand All @@ -240,6 +278,8 @@ impl State {
collation_result_senders: Default::default(),
our_validators_groups: Default::default(),
peer_ids: Default::default(),
waiting_collation_fetches: Default::default(),
active_collation_fetches: Default::default(),
}
}

Expand Down Expand Up @@ -349,8 +389,9 @@ async fn distribute_collation(

state.collations.insert(relay_parent, Collation { receipt, pov, status: CollationStatus::Created });

let interested = state.peers_interested_in_leaf(&relay_parent);
// Make sure already connected peers get collations:
for peer_id in state.peers_interested_in_leaf(&relay_parent) {
for peer_id in interested {
advertise_collation(ctx, state, relay_parent, peer_id).await;
}

Expand All @@ -373,6 +414,7 @@ async fn determine_core(
}
}
}

Ok(None)
}

Expand Down Expand Up @@ -455,7 +497,7 @@ async fn declare(
async fn connect_to_validators(
ctx: &mut impl SubsystemContext,
validator_ids: Vec<AuthorityDiscoveryId>,
) {
) {
// ignore address resolution failure
// will reissue a new request on new collation
let (failed, _) = oneshot::channel();
Expand Down Expand Up @@ -607,8 +649,18 @@ async fn process_msg(
return Ok(());
};

state.metrics.on_collation_sent_requested();

let _span = _span.as_ref().map(|s| s.child("sending"));
send_collation(state, incoming, receipt, pov).await;

let waiting = state.waiting_collation_fetches.entry(incoming.payload.relay_parent).or_default();

if waiting.collation_fetch_active {
waiting.waiting.push_back(incoming);
} else {
waiting.collation_fetch_active = true;
send_collation(state, incoming, receipt, pov).await;
}
} else {
tracing::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -640,12 +692,28 @@ async fn send_collation(
receipt: CandidateReceipt,
pov: PoV,
) {
if let Err(_) = request.send_response(CollationFetchingResponse::Collation(receipt, pov)) {
let (tx, rx) = oneshot::channel();

let relay_parent = request.payload.relay_parent;

let response = OutgoingResponse {
result: Ok(CollationFetchingResponse::Collation(receipt, pov)),
reputation_changes: Vec::new(),
sent_feedback: Some(tx),
};

if let Err(_) = request.send_outgoing_response(response) {
tracing::warn!(
target: LOG_TARGET,
"Sending collation response failed",
);
}

state.active_collation_fetches.push(async move {
let _ = rx.await;
relay_parent
}.boxed());

state.metrics.on_collation_sent();
}

Expand Down Expand Up @@ -840,6 +908,7 @@ async fn handle_our_view_change(
}
state.our_validators_groups.remove(removed);
state.span_per_relay_parent.remove(removed);
state.waiting_collation_fetches.remove(removed);
}

state.view = view;
Expand All @@ -861,17 +930,38 @@ pub(crate) async fn run(
let mut runtime = RuntimeInfo::new(None);

loop {
let msg = ctx.recv().fuse().await.map_err(Fatal::SubsystemReceive)?;
match msg {
Communication { msg } => {
log_error(
process_msg(&mut ctx, &mut runtime, &mut state, msg).await,
"Failed to process message"
)?;
select! {
msg = ctx.recv().fuse() => match msg.map_err(Fatal::SubsystemReceive)? {
Communication { msg } => {
log_error(
process_msg(&mut ctx, &mut runtime, &mut state, msg).await,
"Failed to process message"
)?;
},
Signal(ActiveLeaves(_update)) => {}
Signal(BlockFinalized(..)) => {}
Signal(Conclude) => return Ok(()),
},
Signal(ActiveLeaves(_update)) => {}
Signal(BlockFinalized(..)) => {}
Signal(Conclude) => return Ok(()),
relay_parent = state.active_collation_fetches.select_next_some() => {
let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
if let Some(next) = waiting.waiting.pop_front() {
next
} else {
waiting.collation_fetch_active = false;
continue
}
} else {
// No waiting collation fetches means we already removed the relay parent from our view.
continue
};

if let Some(collation) = state.collations.get(&relay_parent) {
let receipt = collation.receipt.clone();
let pov = collation.pov.clone();

send_collation(&mut state, next, receipt, pov).await;
}
}
}
}
}
Loading