Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not force collators to update after enabling async backing #1920

Merged
merged 11 commits into from
Oct 19, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll};

use futures::{future::BoxFuture, FutureExt};
use polkadot_node_network_protocol::{
peer_set::CollationVersion,
request_response::{outgoing::RequestError, v1 as request_v1, OutgoingResult},
PeerId,
};
Expand Down Expand Up @@ -160,6 +161,8 @@ pub fn fetched_collation_sanity_check(
pub struct CollationEvent {
/// Collator id.
pub collator_id: CollatorId,
/// The network protocol version the collator is using.
pub collator_protocol_version: CollationVersion,
/// The requested collation data.
pub pending_collation: PendingCollation,
}
Expand Down Expand Up @@ -307,6 +310,8 @@ pub(super) struct CollationFetchRequest {
pub pending_collation: PendingCollation,
/// Collator id.
pub collator_id: CollatorId,
/// The network protocol version the collator is using.
pub collator_protocol_version: CollationVersion,
/// Responses from collator.
pub from_collator: BoxFuture<'static, OutgoingResult<request_v1::CollationFetchingResponse>>,
/// Handle used for checking if this request was cancelled.
Expand Down Expand Up @@ -334,6 +339,7 @@ impl Future for CollationFetchRequest {
self.span.as_mut().map(|s| s.add_string_tag("success", "false"));
return Poll::Ready((
CollationEvent {
collator_protocol_version: self.collator_protocol_version,
collator_id: self.collator_id.clone(),
pending_collation: self.pending_collation,
},
Expand All @@ -344,6 +350,7 @@ impl Future for CollationFetchRequest {
let res = self.from_collator.poll_unpin(cx).map(|res| {
(
CollationEvent {
collator_protocol_version: self.collator_protocol_version,
collator_id: self.collator_id.clone(),
pending_collation: self.pending_collation,
},
Expand Down
161 changes: 95 additions & 66 deletions polkadot/node/network/collator-protocol/src/validator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error");
const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Invalid network message signature");
const COST_REPORT_BAD: Rep = Rep::Malicious("A collator was reported by another subsystem");
const COST_WRONG_PARA: Rep = Rep::Malicious("A collator provided a collation for the wrong para");
const COST_PROTOCOL_MISUSE: Rep =
Rep::Malicious("A collator advertising a collation for an async backing relay parent using V1");
const COST_UNNEEDED_COLLATOR: Rep = Rep::CostMinor("An unneeded collator connected");
const BENEFIT_NOTIFY_GOOD: Rep =
Rep::BenefitMinor("A collator was noted good by another subsystem");
Expand Down Expand Up @@ -144,9 +146,6 @@ enum InsertAdvertisementError {
UndeclaredCollator,
/// A limit for announcements per peer is reached.
PeerLimitReached,
/// Mismatch of relay parent mode and advertisement arguments.
/// An internal error that should not happen.
ProtocolMismatch,
}

#[derive(Debug)]
Expand Down Expand Up @@ -252,23 +251,41 @@ impl PeerData {
},
(
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. },
Some(candidate_hash),
candidate_hash,
) => {
if state
.advertisements
.get(&on_relay_parent)
.map_or(false, |candidates| candidates.contains(&candidate_hash))
{
return Err(InsertAdvertisementError::Duplicate)
}
let candidates = state.advertisements.entry(on_relay_parent).or_default();

if candidates.len() > max_candidate_depth {
return Err(InsertAdvertisementError::PeerLimitReached)
}
candidates.insert(candidate_hash);
if let Some(candidate_hash) = candidate_hash {
if state
.advertisements
.get(&on_relay_parent)
.map_or(false, |candidates| candidates.contains(&candidate_hash))
{
return Err(InsertAdvertisementError::Duplicate)
}

let candidates =
state.advertisements.entry(on_relay_parent).or_default();

if candidates.len() > max_candidate_depth {
return Err(InsertAdvertisementError::PeerLimitReached)
}
candidates.insert(candidate_hash);
} else {
if self.version != CollationVersion::V1 {
gum::error!(
target: LOG_TARGET,
"Programming error, `candidate_hash` can not be `None` \
for non `V1` networking.",
);
}

if state.advertisements.contains_key(&on_relay_parent) {
return Err(InsertAdvertisementError::Duplicate)
}
state
.advertisements
.insert(on_relay_parent, HashSet::from_iter(candidate_hash));
};
},
_ => return Err(InsertAdvertisementError::ProtocolMismatch),
}

state.last_active = Instant::now();
Expand Down Expand Up @@ -705,6 +722,7 @@ async fn request_collation(
let collation_request = CollationFetchRequest {
pending_collation,
collator_id: collator_id.clone(),
collator_protocol_version: peer_protocol_version,
from_collator: response_recv.boxed(),
cancellation_token: cancellation_token.clone(),
span: state
Expand Down Expand Up @@ -920,10 +938,11 @@ enum AdvertisementError {
UndeclaredCollator,
/// We're assigned to a different para at the given relay parent.
InvalidAssignment,
/// An advertisement format doesn't match the relay parent.
ProtocolMismatch,
/// Para reached a limit of seconded candidates for this relay parent.
SecondedLimitReached,
/// Collator trying to advertise a collation using V1 protocol for an async backing relay
/// parent.
ProtocolMisuse,
/// Advertisement is invalid.
Invalid(InsertAdvertisementError),
}
Expand All @@ -933,8 +952,9 @@ impl AdvertisementError {
use AdvertisementError::*;
match self {
InvalidAssignment => Some(COST_WRONG_PARA),
ProtocolMisuse => Some(COST_PROTOCOL_MISUSE),
RelayParentUnknown | UndeclaredCollator | Invalid(_) => Some(COST_UNEXPECTED_MESSAGE),
UnknownPeer | ProtocolMismatch | SecondedLimitReached => None,
UnknownPeer | SecondedLimitReached => None,
}
}
}
Expand Down Expand Up @@ -1042,6 +1062,13 @@ where
.get(&relay_parent)
.map(|s| s.child("advertise-collation"));

let peer_data = state.peer_data.get_mut(&peer_id).ok_or(AdvertisementError::UnknownPeer)?;

if peer_data.version == CollationVersion::V1 && !state.active_leaves.contains_key(&relay_parent)
eskimor marked this conversation as resolved.
Show resolved Hide resolved
{
return Err(AdvertisementError::ProtocolMisuse)
}

let per_relay_parent = state
.per_relay_parent
.get(&relay_parent)
Expand All @@ -1050,20 +1077,12 @@ where
let relay_parent_mode = per_relay_parent.prospective_parachains_mode;
let assignment = &per_relay_parent.assignment;

let peer_data = state.peer_data.get_mut(&peer_id).ok_or(AdvertisementError::UnknownPeer)?;
let collator_para_id =
peer_data.collating_para().ok_or(AdvertisementError::UndeclaredCollator)?;

match assignment.current {
Some(id) if id == collator_para_id => {
// Our assignment.
},
_ => return Err(AdvertisementError::InvalidAssignment),
};

if relay_parent_mode.is_enabled() && prospective_candidate.is_none() {
// Expected v2 advertisement.
return Err(AdvertisementError::ProtocolMismatch)
// Check if this is assigned to us.
if assignment.current.map_or(true, |id| id != collator_para_id) {
return Err(AdvertisementError::InvalidAssignment)
}

// Always insert advertisements that pass all the checks for spam protection.
Expand All @@ -1077,13 +1096,17 @@ where
&state.active_leaves,
)
.map_err(AdvertisementError::Invalid)?;

if !per_relay_parent.collations.is_seconded_limit_reached(relay_parent_mode) {
return Err(AdvertisementError::SecondedLimitReached)
}

if let Some((candidate_hash, parent_head_data_hash)) = prospective_candidate {
let is_seconding_allowed = !relay_parent_mode.is_enabled() ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!relay_parent_mode.is_enabled() || is there in case two nodes communicate with V2 but async backing is still off, meaning we don't need to check for can_second -- there's only one slot for candidate per relay parent anyway.

Also, seems like this branch won't be executed at all even with async backing enabled if collator chooses to communicate with v1, which opens some room for spamming the node with validation work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1920 (comment)

Also, seems like this branch won't be executed at all even with async backing enabled if collator chooses to communicate with v1, which opens some room for spamming the node with validation work.

You mean because we would add the collation without further checks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1920 (comment)

In the previous version of this PR use_non_prospective_chains_mode was defined based on what collator told us, in the original code it relies on the runtime request from validators.

You mean because we would add the collation without further checks?

The idea is to only request collation once the parent parachain block becomes backable. Until then, we don't do any unnecessary work. With this code collator can

  1. Choose to use v1 networking
  2. Send maximum number of sequentially based collations and validators will start requesting & validating without waiting for the "backable parent" part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in the v1 networking version it should only accept parent_head_data_hash == current_head_hash.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in the v1 networking version it should only accept parent_head_data_hash == current_head_hash.

Should be done now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this line should be reverted:

} else {
// Relay parent is unknown or async backing is disabled.
false

If validator and collator both use v2 while async backing is disabled this will block all advertisements.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done @slumber

can_second(
// We need to queue the advertisement if we are not allowed to second it.
//
// This is also only important when async backing is enabled.
let queue_advertisement = relay_parent_mode.is_enabled() &&
!can_second(
sender,
collator_para_id,
relay_parent,
Expand All @@ -1092,7 +1115,7 @@ where
)
.await;

if !is_seconding_allowed {
if queue_advertisement {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
Expand Down Expand Up @@ -1125,6 +1148,7 @@ where
prospective_candidate,
)
.await;

if let Err(fetch_error) = result {
gum::debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1477,7 +1501,7 @@ async fn process_msg<Context>(
},
};
let fetched_collation = FetchedCollation::from(&receipt.to_plain());
if let Some(CollationEvent { collator_id, pending_collation }) =
if let Some(CollationEvent { collator_id, pending_collation, .. }) =
state.fetched_candidates.remove(&fetched_collation)
{
let PendingCollation { relay_parent, peer_id, prospective_candidate, .. } =
Expand Down Expand Up @@ -1635,7 +1659,7 @@ async fn run_inner<Context>(
Ok(res) => res
};

let CollationEvent {collator_id, pending_collation} = res.collation_event.clone();
let CollationEvent {collator_id, pending_collation, .. } = res.collation_event.clone();
if let Err(err) = kick_off_seconding(&mut ctx, &mut state, res).await {
gum::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1783,39 +1807,39 @@ async fn kick_off_seconding<Context>(
},
};
let collations = &mut per_relay_parent.collations;
let relay_parent_mode = per_relay_parent.prospective_parachains_mode;

let fetched_collation = FetchedCollation::from(&candidate_receipt);
if let Entry::Vacant(entry) = state.fetched_candidates.entry(fetched_collation) {
collation_event.pending_collation.commitments_hash =
Some(candidate_receipt.commitments_hash);

let pvd =
match (relay_parent_mode, collation_event.pending_collation.prospective_candidate) {
(
ProspectiveParachainsMode::Enabled { .. },
Some(ProspectiveCandidate { parent_head_data_hash, .. }),
) =>
request_prospective_validation_data(
ctx.sender(),
relay_parent,
parent_head_data_hash,
pending_collation.para_id,
)
.await?,
(ProspectiveParachainsMode::Disabled, _) =>
request_persisted_validation_data(
ctx.sender(),
candidate_receipt.descriptor().relay_parent,
candidate_receipt.descriptor().para_id,
)
.await?,
_ => {
// `handle_advertisement` checks for protocol mismatch.
return Ok(())
},
}
.ok_or(SecondingError::PersistedValidationDataNotFound)?;
let pvd = match (
collation_event.collator_protocol_version,
collation_event.pending_collation.prospective_candidate,
) {
(CollationVersion::V2, Some(ProspectiveCandidate { parent_head_data_hash, .. }))
if per_relay_parent.prospective_parachains_mode.is_enabled() =>
request_prospective_validation_data(
ctx.sender(),
relay_parent,
parent_head_data_hash,
pending_collation.para_id,
)
.await?,
// Support V2 collators without async backing enabled.
(CollationVersion::V2, Some(_)) | (CollationVersion::V1, _) =>
request_persisted_validation_data(
ctx.sender(),
candidate_receipt.descriptor().relay_parent,
candidate_receipt.descriptor().para_id,
)
.await?,
_ => {
// `handle_advertisement` checks for protocol mismatch.
return Ok(())
},
}
.ok_or(SecondingError::PersistedValidationDataNotFound)?;

fetched_collation_sanity_check(
&collation_event.pending_collation,
Expand Down Expand Up @@ -1864,7 +1888,8 @@ async fn handle_collation_fetch_response(
network_error_freq: &mut gum::Freq,
canceled_freq: &mut gum::Freq,
) -> std::result::Result<PendingCollationFetch, Option<(PeerId, Rep)>> {
let (CollationEvent { collator_id, pending_collation }, response) = response;
let (CollationEvent { collator_id, collator_protocol_version, pending_collation }, response) =
response;
// Remove the cancellation handle, as the future already completed.
state.collation_requests_cancel_handles.remove(&pending_collation);

Expand Down Expand Up @@ -1970,7 +1995,11 @@ async fn handle_collation_fetch_response(

metrics_result = Ok(());
Ok(PendingCollationFetch {
collation_event: CollationEvent { collator_id, pending_collation },
collation_event: CollationEvent {
collator_id,
pending_collation,
collator_protocol_version,
},
candidate_receipt,
pov,
})
Expand Down
Loading