Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Revert "collator-protocol: short-term fixes for connectivity (#4640)" (
Browse files Browse the repository at this point in the history
…#4914)

* Revert "collator-protocol: fix wrong warning (#4909)"

This reverts commit c02c24d.

* Revert "collator-protocol: short-term fixes for connectivity (#4640)"

This reverts commit 5b74841.

* make the slots great again

Co-authored-by: Andronik <write@reusable.software>
  • Loading branch information
rphmeier and ordian authored Feb 18, 2022
1 parent 80676a2 commit 502862f
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 165 deletions.
130 changes: 46 additions & 84 deletions node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,15 @@ impl State {

/// Distribute a collation.
///
/// If the para is not scheduled on any core, at the relay parent,
/// or the relay parent isn't in our view or we already collated on the relay parent,
/// we ignore the message as it must be invalid in that case -
/// although this indicates a logic error elsewhere in the node.
///
/// Otherwise, start advertising the collation to interested peers.
/// Figure out the core our para is assigned to and the relevant validators.
/// Issue a connection request to these validators.
/// If the para is not scheduled or next up on any core, at the relay-parent,
/// or the relay-parent isn't in the active-leaves set, we ignore the message
/// as it must be invalid in that case - although this indicates a logic error
/// elsewhere in the node.
async fn distribute_collation<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
id: ParaId,
receipt: CandidateReceipt,
Expand Down Expand Up @@ -357,8 +358,32 @@ where
return Ok(())
}

if !state.our_validators_groups.contains_key(&relay_parent) {
tracing::warn!(target: LOG_TARGET, "Could not determine validators assigned to the core.");
// Determine which core the para collated-on is assigned to.
// If it is not scheduled then ignore the message.
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
Some(core) => core,
None => {
tracing::warn!(
target: LOG_TARGET,
para_id = %id,
?relay_parent,
"looks like no core is assigned to {} at {}", id, relay_parent,
);

return Ok(())
},
};

// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;

if current_validators.validators.is_empty() {
tracing::warn!(
target: LOG_TARGET,
core = ?our_core,
"there are no validators assigned to core",
);

return Ok(())
}
Expand All @@ -369,9 +394,16 @@ where
relay_parent = %relay_parent,
candidate_hash = ?receipt.hash(),
pov_hash = ?pov.hash(),
"Accepted collation",
core = ?our_core,
?current_validators,
"Accepted collation, connecting to validators."
);

// Issue a discovery request for the validators of the current group:
connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;

state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());

if let Some(result_sender) = result_sender {
state.collation_result_senders.insert(receipt.hash(), result_sender);
}
Expand Down Expand Up @@ -490,7 +522,7 @@ where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
// ignore address resolution failure
// will reissue a new request on new relay parent
// will reissue a new request on new collation
let (failed, _) = oneshot::channel();
ctx.send_message(NetworkBridgeMessage::ConnectToValidators {
validator_ids,
Expand Down Expand Up @@ -601,7 +633,8 @@ where
);
},
Some(id) => {
distribute_collation(ctx, state, id, receipt, pov, result_sender).await?;
distribute_collation(ctx, runtime, state, id, receipt, pov, result_sender)
.await?;
},
None => {
tracing::warn!(
Expand Down Expand Up @@ -886,7 +919,7 @@ where
},
OurViewChange(view) => {
tracing::trace!(target: LOG_TARGET, ?view, "Own view change");
handle_our_view_change(ctx, runtime, state, view).await?;
handle_our_view_change(state, view).await?;
},
PeerMessage(remote, msg) => {
handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
Expand All @@ -900,16 +933,7 @@ where
}

/// Handles our view changes.
async fn handle_our_view_change<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
view: OurView,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> {
for removed in state.view.difference(&view) {
tracing::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed.");

Expand Down Expand Up @@ -943,68 +967,6 @@ where
}

state.view = view;
if state.view.is_empty() {
return Ok(())
}

let id = match state.collating_on {
Some(id) => id,
None => return Ok(()),
};

// all validators assigned to the core
// across all active leaves
// this is typically our current group
// but can also include the previous group at
// rotation boundaries and considering forks
let mut group_validators = HashSet::new();
let mut maybe_core = None;

for relay_parent in state.view.iter().cloned() {
tracing::debug!(
target: LOG_TARGET,
?relay_parent,
para_id = ?id,
"Processing relay parent.",
);

// Determine our assigned core.
// If it is not scheduled then ignore the relay parent.
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
Some(core) => {
maybe_core = Some(core);
core
},
None => continue,
};

// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;

let validators = current_validators.validators;
group_validators.extend(validators);

state.our_validators_groups.entry(relay_parent).or_insert(ValidatorGroup::new());
}

let validators: Vec<_> = group_validators.into_iter().collect();
let no_one_is_assigned = validators.is_empty();
if no_one_is_assigned {
if let Some(core) = maybe_core {
tracing::warn!(target: LOG_TARGET, ?core, "No validators assigned to our core.");
} else {
tracing::debug!(target: LOG_TARGET, "Core is occupied for all active leaves.");
}
return Ok(())
}
tracing::debug!(
target: LOG_TARGET,
?validators,
para_id = ?id,
"Connecting to validators.",
);
connect_to_validators(ctx, validators).await;

Ok(())
}
Expand Down
Loading

0 comments on commit 502862f

Please sign in to comment.