Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
collator-protocol: asynchronous backing changes (#5740)
Browse files Browse the repository at this point in the history
* Draft collator side changes

* Start working on collations management

* Handle peer's view change

* Versioning on advertising

* Versioned collation fetching request

* Handle versioned messages

* Improve docs for collation requests

* Add spans

* Add request receiver to overseer

* Fix collator side tests

* Extract relay parent mode to lib

* Validator side draft

* Add more checks for advertisement

* Request pvd based on async backing mode

* review

* Validator side improvements

* Make old tests green

* More fixes

* Collator side tests draft

* Send collation test

* fmt

* Collator side network protocol versioning

* cleanup

* merge artifacts

* Validator side net protocol versioning

* Remove fragment tree membership request

* Resolve todo

* Collator side core state test

* Improve net protocol compatibility

* Validator side tests

* more improvements

* style fixes

* downgrade log

* Track implicit assignments

* Limit the number of seconded candidates per para

* Add a sanity check

* Handle fetched candidate

* fix tests

* Retry fetch

* Guard against dequeueing while already fetching

* Reintegrate connection management

* Timeout on advertisements

* fmt

* spellcheck

* update tests after merge
  • Loading branch information
slumber authored Oct 12, 2022
1 parent ca5e2fe commit c23032a
Show file tree
Hide file tree
Showing 24 changed files with 4,434 additions and 1,332 deletions.
10 changes: 8 additions & 2 deletions node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ async fn handle_new_activations<Context>(
"collation-builder",
Box::pin(async move {
let persisted_validation_data_hash = validation_data.hash();
let parent_head_data_hash = validation_data.parent_head.hash();

let (collation, result_sender) =
match (task_config.collator)(relay_parent, &validation_data).await {
Expand Down Expand Up @@ -385,8 +386,13 @@ async fn handle_new_activations<Context>(

if let Err(err) = task_sender
.send(
CollatorProtocolMessage::DistributeCollation(ccr, pov, result_sender)
.into(),
CollatorProtocolMessage::DistributeCollation(
ccr,
parent_head_data_hash,
pov,
result_sender,
)
.into(),
)
.await
{
Expand Down
162 changes: 162 additions & 0 deletions node/network/collator-protocol/src/collator_side/collation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Primitives for tracking collations-related data.

use std::collections::{HashSet, VecDeque};

use futures::{future::BoxFuture, stream::FuturesUnordered};

use polkadot_node_network_protocol::{
request_response::{
incoming::OutgoingResponse, v1 as protocol_v1, vstaging as protocol_vstaging,
IncomingRequest,
},
PeerId,
};
use polkadot_node_primitives::PoV;
use polkadot_primitives::v2::{CandidateHash, CandidateReceipt, Hash, Id as ParaId};

/// The status of a collation as seen from the collator.
pub enum CollationStatus {
/// The collation was created, but we did not advertise it to any validator.
Created,
/// The collation was advertised to at least one validator.
Advertised,
/// The collation was requested by at least one validator.
Requested,
}

impl CollationStatus {
/// Advance to the [`Self::Advertised`] status.
///
/// This ensures that `self` isn't already [`Self::Requested`].
pub fn advance_to_advertised(&mut self) {
if !matches!(self, Self::Requested) {
*self = Self::Advertised;
}
}

/// Advance to the [`Self::Requested`] status.
pub fn advance_to_requested(&mut self) {
*self = Self::Requested;
}
}

/// A collation built by the collator.
pub struct Collation {
/// Candidate receipt.
pub receipt: CandidateReceipt,
/// Parent head-data hash.
pub parent_head_data_hash: Hash,
/// Proof to verify the state transition of the parachain.
pub pov: PoV,
/// Collation status.
pub status: CollationStatus,
}

/// Stores the state for waiting collation fetches per relay parent.
#[derive(Default)]
pub struct WaitingCollationFetches {
/// A flag indicating that we have an ongoing request.
/// This limits the number of collations being sent at any moment
/// of time to 1 for each relay parent.
///
/// If set to `true`, any new request will be queued.
pub collation_fetch_active: bool,
/// The collation fetches waiting to be fulfilled.
pub req_queue: VecDeque<VersionedCollationRequest>,
/// All peers that are waiting or actively uploading.
///
/// We will not accept multiple requests from the same peer, otherwise our DoS protection of
/// moving on to the next peer after `MAX_UNSHARED_UPLOAD_TIME` would be pointless.
pub waiting_peers: HashSet<(PeerId, CandidateHash)>,
}

/// Backwards-compatible wrapper for incoming collations requests.
pub enum VersionedCollationRequest {
V1(IncomingRequest<protocol_v1::CollationFetchingRequest>),
VStaging(IncomingRequest<protocol_vstaging::CollationFetchingRequest>),
}

impl From<IncomingRequest<protocol_v1::CollationFetchingRequest>> for VersionedCollationRequest {
fn from(req: IncomingRequest<protocol_v1::CollationFetchingRequest>) -> Self {
Self::V1(req)
}
}

impl From<IncomingRequest<protocol_vstaging::CollationFetchingRequest>>
for VersionedCollationRequest
{
fn from(req: IncomingRequest<protocol_vstaging::CollationFetchingRequest>) -> Self {
Self::VStaging(req)
}
}

impl VersionedCollationRequest {
/// Returns parachain id from the request payload.
pub fn para_id(&self) -> ParaId {
match self {
VersionedCollationRequest::V1(req) => req.payload.para_id,
VersionedCollationRequest::VStaging(req) => req.payload.para_id,
}
}

/// Returns relay parent from the request payload.
pub fn relay_parent(&self) -> Hash {
match self {
VersionedCollationRequest::V1(req) => req.payload.relay_parent,
VersionedCollationRequest::VStaging(req) => req.payload.relay_parent,
}
}

/// Returns id of the peer the request was received from.
pub fn peer_id(&self) -> PeerId {
match self {
VersionedCollationRequest::V1(req) => req.peer,
VersionedCollationRequest::VStaging(req) => req.peer,
}
}

/// Sends the response back to requester.
pub fn send_outgoing_response(
self,
response: OutgoingResponse<protocol_v1::CollationFetchingResponse>,
) -> Result<(), ()> {
match self {
VersionedCollationRequest::V1(req) => req.send_outgoing_response(response),
VersionedCollationRequest::VStaging(req) => req.send_outgoing_response(response),
}
}
}

/// Result of the finished background send-collation task.
///
/// Note that if the timeout was hit the request doesn't get
/// aborted, it only indicates that we should start processing
/// the next one from the queue.
pub struct CollationSendResult {
/// Candidate's relay parent.
pub relay_parent: Hash,
/// Candidate hash.
pub candidate_hash: CandidateHash,
/// Peer id.
pub peer_id: PeerId,
/// Whether the max unshared timeout was hit.
pub timed_out: bool,
}

pub type ActiveCollationFetches = FuturesUnordered<BoxFuture<'static, CollationSendResult>>;
4 changes: 2 additions & 2 deletions node/network/collator-protocol/src/collator_side/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2022 Parity Technologies (UK) Ltd.
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
Expand All @@ -20,7 +20,7 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
pub fn on_advertisment_made(&self) {
pub fn on_advertisement_made(&self) {
if let Some(metrics) = &self.0 {
metrics.advertisements_made.inc();
}
Expand Down
Loading

0 comments on commit c23032a

Please sign in to comment.