Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

[DO NOT MERGE] Revert "collator-protocol: short-term fixes for connectivity (#4640)" #4925

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 46 additions & 76 deletions node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,15 @@ impl State {

/// Distribute a collation.
///
/// If the para is not scheduled on any core, at the relay parent,
/// or the relay parent isn't in our view or we already collated on the relay parent,
/// we ignore the message as it must be invalid in that case -
/// although this indicates a logic error elsewhere in the node.
///
/// Otherwise, start advertising the collation to interested peers.
/// Figure out the core our para is assigned to and the relevant validators.
/// Issue a connection request to these validators.
/// If the para is not scheduled or next up on any core, at the relay-parent,
/// or the relay-parent isn't in the active-leaves set, we ignore the message
/// as it must be invalid in that case - although this indicates a logic error
/// elsewhere in the node.
async fn distribute_collation<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
id: ParaId,
receipt: CandidateReceipt,
Expand Down Expand Up @@ -357,8 +358,32 @@ where
return Ok(())
}

if !state.our_validators_groups.contains_key(&relay_parent) {
tracing::warn!(target: LOG_TARGET, "Could not determine validators assigned to the core.");
// Determine which core the para collated-on is assigned to.
// If it is not scheduled then ignore the message.
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
Some(core) => core,
None => {
tracing::warn!(
target: LOG_TARGET,
para_id = %id,
?relay_parent,
"looks like no core is assigned to {} at {}", id, relay_parent,
);

return Ok(())
},
};

// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;

if current_validators.validators.is_empty() {
tracing::warn!(
target: LOG_TARGET,
core = ?our_core,
"there are no validators assigned to core",
);

return Ok(())
}
Expand All @@ -369,9 +394,16 @@ where
relay_parent = %relay_parent,
candidate_hash = ?receipt.hash(),
pov_hash = ?pov.hash(),
"Accepted collation",
core = ?our_core,
?current_validators,
"Accepted collation, connecting to validators."
);

// Issue a discovery request for the validators of the current group:
connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;

state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());

if let Some(result_sender) = result_sender {
state.collation_result_senders.insert(receipt.hash(), result_sender);
}
Expand Down Expand Up @@ -490,7 +522,7 @@ where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
// ignore address resolution failure
// will reissue a new request on new relay parent
// will reissue a new request on new collation
let (failed, _) = oneshot::channel();
ctx.send_message(NetworkBridgeMessage::ConnectToValidators {
validator_ids,
Expand Down Expand Up @@ -601,7 +633,8 @@ where
);
},
Some(id) => {
distribute_collation(ctx, state, id, receipt, pov, result_sender).await?;
distribute_collation(ctx, runtime, state, id, receipt, pov, result_sender)
.await?;
},
None => {
tracing::warn!(
Expand Down Expand Up @@ -886,7 +919,7 @@ where
},
OurViewChange(view) => {
tracing::trace!(target: LOG_TARGET, ?view, "Own view change");
handle_our_view_change(ctx, runtime, state, view).await?;
handle_our_view_change(state, view).await?;
},
PeerMessage(remote, msg) => {
handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
Expand All @@ -900,16 +933,7 @@ where
}

/// Handles our view changes.
async fn handle_our_view_change<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
view: OurView,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> {
for removed in state.view.difference(&view) {
tracing::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed.");

Expand Down Expand Up @@ -943,60 +967,6 @@ where
}

state.view = view;
if state.view.is_empty() {
return Ok(())
}

let id = match state.collating_on {
Some(id) => id,
None => return Ok(()),
};

// all validators assigned to the core
// across all active leaves
// this is typically our current group
// but can also include the previous group at
// rotation boundaries and considering forks
let mut group_validators = HashSet::new();

for relay_parent in state.view.iter().cloned() {
tracing::debug!(
target: LOG_TARGET,
?relay_parent,
para_id = ?id,
"Processing relay parent.",
);

// Determine our assigned core.
// If it is not scheduled then ignore the relay parent.
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
Some(core) => core,
None => continue,
};

// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;

let validators = current_validators.validators;
group_validators.extend(validators);

state.our_validators_groups.entry(relay_parent).or_insert(ValidatorGroup::new());
}

let validators: Vec<_> = group_validators.into_iter().collect();
let no_one_is_assigned = validators.is_empty();
if no_one_is_assigned {
tracing::warn!(target: LOG_TARGET, "No validators assigned to our core.",);
return Ok(())
}
tracing::debug!(
target: LOG_TARGET,
?validators,
para_id = ?id,
"Connecting to validators.",
);
connect_to_validators(ctx, validators).await;

Ok(())
}
Expand Down
Loading