Skip to content

Commit

Permalink
Do not force collators to update after enabling async backing (#1920)
Browse files Browse the repository at this point in the history
The validators are checking if async backing is enabled by checking the
version of the runtime api. If the runtime api is upgraded by a runtime
upgrade, the validators start to also enable the async backing logic.
However, just because async backing is enabled, it doesn't mean that all
collators and parachain runtimes have upgraded. This pull request fixes
an issue about advertising collations to the relay chain when it has
async backing enabled, but the collator is still using the old
networking protocol. The implementation is actually backwards compatible
as we can not expect that everyone directly upgrades. However, the
collation advertisement logic was requiring V2 networking messages after
async backing was enabled, which was wrong. This is now fixed by this
pull request.

Closes: #1923

---------

Co-authored-by: eskimor <eskimor@users.noreply.github.com>
  • Loading branch information
bkchr and eskimor authored Oct 19, 2023
1 parent 961a8fd commit b967ba5
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll};

use futures::{future::BoxFuture, FutureExt};
use polkadot_node_network_protocol::{
peer_set::CollationVersion,
request_response::{outgoing::RequestError, v1 as request_v1, OutgoingResult},
PeerId,
};
Expand Down Expand Up @@ -160,6 +161,8 @@ pub fn fetched_collation_sanity_check(
pub struct CollationEvent {
/// Collator id.
pub collator_id: CollatorId,
/// The network protocol version the collator is using.
pub collator_protocol_version: CollationVersion,
/// The requested collation data.
pub pending_collation: PendingCollation,
}
Expand Down Expand Up @@ -307,6 +310,8 @@ pub(super) struct CollationFetchRequest {
pub pending_collation: PendingCollation,
/// Collator id.
pub collator_id: CollatorId,
/// The network protocol version the collator is using.
pub collator_protocol_version: CollationVersion,
/// Responses from collator.
pub from_collator: BoxFuture<'static, OutgoingResult<request_v1::CollationFetchingResponse>>,
/// Handle used for checking if this request was cancelled.
Expand Down Expand Up @@ -334,6 +339,7 @@ impl Future for CollationFetchRequest {
self.span.as_mut().map(|s| s.add_string_tag("success", "false"));
return Poll::Ready((
CollationEvent {
collator_protocol_version: self.collator_protocol_version,
collator_id: self.collator_id.clone(),
pending_collation: self.pending_collation,
},
Expand All @@ -344,6 +350,7 @@ impl Future for CollationFetchRequest {
let res = self.from_collator.poll_unpin(cx).map(|res| {
(
CollationEvent {
collator_protocol_version: self.collator_protocol_version,
collator_id: self.collator_id.clone(),
pending_collation: self.pending_collation,
},
Expand Down
161 changes: 95 additions & 66 deletions polkadot/node/network/collator-protocol/src/validator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error");
const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Invalid network message signature");
const COST_REPORT_BAD: Rep = Rep::Malicious("A collator was reported by another subsystem");
const COST_WRONG_PARA: Rep = Rep::Malicious("A collator provided a collation for the wrong para");
const COST_PROTOCOL_MISUSE: Rep =
Rep::Malicious("A collator advertising a collation for an async backing relay parent using V1");
const COST_UNNEEDED_COLLATOR: Rep = Rep::CostMinor("An unneeded collator connected");
const BENEFIT_NOTIFY_GOOD: Rep =
Rep::BenefitMinor("A collator was noted good by another subsystem");
Expand Down Expand Up @@ -144,9 +146,6 @@ enum InsertAdvertisementError {
UndeclaredCollator,
/// A limit for announcements per peer is reached.
PeerLimitReached,
/// Mismatch of relay parent mode and advertisement arguments.
/// An internal error that should not happen.
ProtocolMismatch,
}

#[derive(Debug)]
Expand Down Expand Up @@ -252,23 +251,41 @@ impl PeerData {
},
(
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. },
Some(candidate_hash),
candidate_hash,
) => {
if state
.advertisements
.get(&on_relay_parent)
.map_or(false, |candidates| candidates.contains(&candidate_hash))
{
return Err(InsertAdvertisementError::Duplicate)
}
let candidates = state.advertisements.entry(on_relay_parent).or_default();

if candidates.len() > max_candidate_depth {
return Err(InsertAdvertisementError::PeerLimitReached)
}
candidates.insert(candidate_hash);
if let Some(candidate_hash) = candidate_hash {
if state
.advertisements
.get(&on_relay_parent)
.map_or(false, |candidates| candidates.contains(&candidate_hash))
{
return Err(InsertAdvertisementError::Duplicate)
}

let candidates =
state.advertisements.entry(on_relay_parent).or_default();

if candidates.len() > max_candidate_depth {
return Err(InsertAdvertisementError::PeerLimitReached)
}
candidates.insert(candidate_hash);
} else {
if self.version != CollationVersion::V1 {
gum::error!(
target: LOG_TARGET,
"Programming error, `candidate_hash` can not be `None` \
for non `V1` networking.",
);
}

if state.advertisements.contains_key(&on_relay_parent) {
return Err(InsertAdvertisementError::Duplicate)
}
state
.advertisements
.insert(on_relay_parent, HashSet::from_iter(candidate_hash));
};
},
_ => return Err(InsertAdvertisementError::ProtocolMismatch),
}

state.last_active = Instant::now();
Expand Down Expand Up @@ -705,6 +722,7 @@ async fn request_collation(
let collation_request = CollationFetchRequest {
pending_collation,
collator_id: collator_id.clone(),
collator_protocol_version: peer_protocol_version,
from_collator: response_recv.boxed(),
cancellation_token: cancellation_token.clone(),
span: state
Expand Down Expand Up @@ -920,10 +938,11 @@ enum AdvertisementError {
UndeclaredCollator,
/// We're assigned to a different para at the given relay parent.
InvalidAssignment,
/// An advertisement format doesn't match the relay parent.
ProtocolMismatch,
/// Para reached a limit of seconded candidates for this relay parent.
SecondedLimitReached,
/// Collator trying to advertise a collation using V1 protocol for an async backing relay
/// parent.
ProtocolMisuse,
/// Advertisement is invalid.
Invalid(InsertAdvertisementError),
}
Expand All @@ -933,8 +952,9 @@ impl AdvertisementError {
use AdvertisementError::*;
match self {
InvalidAssignment => Some(COST_WRONG_PARA),
ProtocolMisuse => Some(COST_PROTOCOL_MISUSE),
RelayParentUnknown | UndeclaredCollator | Invalid(_) => Some(COST_UNEXPECTED_MESSAGE),
UnknownPeer | ProtocolMismatch | SecondedLimitReached => None,
UnknownPeer | SecondedLimitReached => None,
}
}
}
Expand Down Expand Up @@ -1042,6 +1062,13 @@ where
.get(&relay_parent)
.map(|s| s.child("advertise-collation"));

let peer_data = state.peer_data.get_mut(&peer_id).ok_or(AdvertisementError::UnknownPeer)?;

if peer_data.version == CollationVersion::V1 && !state.active_leaves.contains_key(&relay_parent)
{
return Err(AdvertisementError::ProtocolMisuse)
}

let per_relay_parent = state
.per_relay_parent
.get(&relay_parent)
Expand All @@ -1050,20 +1077,12 @@ where
let relay_parent_mode = per_relay_parent.prospective_parachains_mode;
let assignment = &per_relay_parent.assignment;

let peer_data = state.peer_data.get_mut(&peer_id).ok_or(AdvertisementError::UnknownPeer)?;
let collator_para_id =
peer_data.collating_para().ok_or(AdvertisementError::UndeclaredCollator)?;

match assignment.current {
Some(id) if id == collator_para_id => {
// Our assignment.
},
_ => return Err(AdvertisementError::InvalidAssignment),
};

if relay_parent_mode.is_enabled() && prospective_candidate.is_none() {
// Expected v2 advertisement.
return Err(AdvertisementError::ProtocolMismatch)
// Check if this is assigned to us.
if assignment.current.map_or(true, |id| id != collator_para_id) {
return Err(AdvertisementError::InvalidAssignment)
}

// Always insert advertisements that pass all the checks for spam protection.
Expand All @@ -1077,13 +1096,17 @@ where
&state.active_leaves,
)
.map_err(AdvertisementError::Invalid)?;

if !per_relay_parent.collations.is_seconded_limit_reached(relay_parent_mode) {
return Err(AdvertisementError::SecondedLimitReached)
}

if let Some((candidate_hash, parent_head_data_hash)) = prospective_candidate {
let is_seconding_allowed = !relay_parent_mode.is_enabled() ||
can_second(
// We need to queue the advertisement if we are not allowed to second it.
//
// This is also only important when async backing is enabled.
let queue_advertisement = relay_parent_mode.is_enabled() &&
!can_second(
sender,
collator_para_id,
relay_parent,
Expand All @@ -1092,7 +1115,7 @@ where
)
.await;

if !is_seconding_allowed {
if queue_advertisement {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
Expand Down Expand Up @@ -1125,6 +1148,7 @@ where
prospective_candidate,
)
.await;

if let Err(fetch_error) = result {
gum::debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1477,7 +1501,7 @@ async fn process_msg<Context>(
},
};
let fetched_collation = FetchedCollation::from(&receipt.to_plain());
if let Some(CollationEvent { collator_id, pending_collation }) =
if let Some(CollationEvent { collator_id, pending_collation, .. }) =
state.fetched_candidates.remove(&fetched_collation)
{
let PendingCollation { relay_parent, peer_id, prospective_candidate, .. } =
Expand Down Expand Up @@ -1635,7 +1659,7 @@ async fn run_inner<Context>(
Ok(res) => res
};

let CollationEvent {collator_id, pending_collation} = res.collation_event.clone();
let CollationEvent {collator_id, pending_collation, .. } = res.collation_event.clone();
if let Err(err) = kick_off_seconding(&mut ctx, &mut state, res).await {
gum::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1783,39 +1807,39 @@ async fn kick_off_seconding<Context>(
},
};
let collations = &mut per_relay_parent.collations;
let relay_parent_mode = per_relay_parent.prospective_parachains_mode;

let fetched_collation = FetchedCollation::from(&candidate_receipt);
if let Entry::Vacant(entry) = state.fetched_candidates.entry(fetched_collation) {
collation_event.pending_collation.commitments_hash =
Some(candidate_receipt.commitments_hash);

let pvd =
match (relay_parent_mode, collation_event.pending_collation.prospective_candidate) {
(
ProspectiveParachainsMode::Enabled { .. },
Some(ProspectiveCandidate { parent_head_data_hash, .. }),
) =>
request_prospective_validation_data(
ctx.sender(),
relay_parent,
parent_head_data_hash,
pending_collation.para_id,
)
.await?,
(ProspectiveParachainsMode::Disabled, _) =>
request_persisted_validation_data(
ctx.sender(),
candidate_receipt.descriptor().relay_parent,
candidate_receipt.descriptor().para_id,
)
.await?,
_ => {
// `handle_advertisement` checks for protocol mismatch.
return Ok(())
},
}
.ok_or(SecondingError::PersistedValidationDataNotFound)?;
let pvd = match (
collation_event.collator_protocol_version,
collation_event.pending_collation.prospective_candidate,
) {
(CollationVersion::V2, Some(ProspectiveCandidate { parent_head_data_hash, .. }))
if per_relay_parent.prospective_parachains_mode.is_enabled() =>
request_prospective_validation_data(
ctx.sender(),
relay_parent,
parent_head_data_hash,
pending_collation.para_id,
)
.await?,
// Support V2 collators without async backing enabled.
(CollationVersion::V2, Some(_)) | (CollationVersion::V1, _) =>
request_persisted_validation_data(
ctx.sender(),
candidate_receipt.descriptor().relay_parent,
candidate_receipt.descriptor().para_id,
)
.await?,
_ => {
// `handle_advertisement` checks for protocol mismatch.
return Ok(())
},
}
.ok_or(SecondingError::PersistedValidationDataNotFound)?;

fetched_collation_sanity_check(
&collation_event.pending_collation,
Expand Down Expand Up @@ -1864,7 +1888,8 @@ async fn handle_collation_fetch_response(
network_error_freq: &mut gum::Freq,
canceled_freq: &mut gum::Freq,
) -> std::result::Result<PendingCollationFetch, Option<(PeerId, Rep)>> {
let (CollationEvent { collator_id, pending_collation }, response) = response;
let (CollationEvent { collator_id, collator_protocol_version, pending_collation }, response) =
response;
// Remove the cancellation handle, as the future already completed.
state.collation_requests_cancel_handles.remove(&pending_collation);

Expand Down Expand Up @@ -1970,7 +1995,11 @@ async fn handle_collation_fetch_response(

metrics_result = Ok(());
Ok(PendingCollationFetch {
collation_event: CollationEvent { collator_id, pending_collation },
collation_event: CollationEvent {
collator_id,
pending_collation,
collator_protocol_version,
},
candidate_receipt,
pov,
})
Expand Down
Loading

0 comments on commit b967ba5

Please sign in to comment.