Skip to content

Commit

Permalink
collator-protocol: short-term fixes for connectivity (paritytech#4640)
Browse files Browse the repository at this point in the history
* collator-protocol: add to reserved peers on every relay parent

* bump collator slots from 25 to 100

* collator-protocol: reduce inactivity timeout from 24s to 5s

* try to satisfy spellcheck

* add connection log

* fmt

* bring a warn back

* gather validators across all active leaves
  • Loading branch information
ordian authored and Wizdave97 committed Feb 3, 2022
1 parent 3a85e11 commit 9776606
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 127 deletions.
122 changes: 76 additions & 46 deletions node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,14 @@ impl State {

/// Distribute a collation.
///
/// Figure out the core our para is assigned to and the relevant validators.
/// Issue a connection request to these validators.
/// If the para is not scheduled or next up on any core, at the relay-parent,
/// or the relay-parent isn't in the active-leaves set, we ignore the message
/// as it must be invalid in that case - although this indicates a logic error
/// elsewhere in the node.
/// If the para is not scheduled on any core, at the relay parent,
/// or the relay parent isn't in our view or we already collated on the relay parent,
/// we ignore the message as it must be invalid in that case -
/// although this indicates a logic error elsewhere in the node.
///
/// Otherwise, start advertising the collation to interested peers.
async fn distribute_collation<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
id: ParaId,
receipt: CandidateReceipt,
Expand Down Expand Up @@ -358,32 +357,8 @@ where
return Ok(())
}

// Determine which core the para collated-on is assigned to.
// If it is not scheduled then ignore the message.
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
Some(core) => core,
None => {
tracing::warn!(
target: LOG_TARGET,
para_id = %id,
?relay_parent,
"looks like no core is assigned to {} at {}", id, relay_parent,
);

return Ok(())
},
};

// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;

if current_validators.validators.is_empty() {
tracing::warn!(
target: LOG_TARGET,
core = ?our_core,
"there are no validators assigned to core",
);
if !state.our_validators_groups.contains_key(&relay_parent) {
tracing::warn!(target: LOG_TARGET, "Could not determine validators assigned to the core.");

return Ok(())
}
Expand All @@ -394,16 +369,9 @@ where
relay_parent = %relay_parent,
candidate_hash = ?receipt.hash(),
pov_hash = ?pov.hash(),
core = ?our_core,
?current_validators,
"Accepted collation, connecting to validators."
"Accepted collation",
);

// Issue a discovery request for the validators of the current group:
connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;

state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());

if let Some(result_sender) = result_sender {
state.collation_result_senders.insert(receipt.hash(), result_sender);
}
Expand Down Expand Up @@ -522,7 +490,7 @@ where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
// ignore address resolution failure
// will reissue a new request on new collation
// will reissue a new request on new relay parent
let (failed, _) = oneshot::channel();
ctx.send_message(NetworkBridgeMessage::ConnectToValidators {
validator_ids,
Expand Down Expand Up @@ -633,8 +601,7 @@ where
);
},
Some(id) => {
distribute_collation(ctx, runtime, state, id, receipt, pov, result_sender)
.await?;
distribute_collation(ctx, state, id, receipt, pov, result_sender).await?;
},
None => {
tracing::warn!(
Expand Down Expand Up @@ -919,7 +886,7 @@ where
},
OurViewChange(view) => {
tracing::trace!(target: LOG_TARGET, ?view, "Own view change");
handle_our_view_change(state, view).await?;
handle_our_view_change(ctx, runtime, state, view).await?;
},
PeerMessage(remote, msg) => {
handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
Expand All @@ -933,7 +900,16 @@ where
}

/// Handles our view changes.
async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> {
async fn handle_our_view_change<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
view: OurView,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
for removed in state.view.difference(&view) {
tracing::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed.");

Expand Down Expand Up @@ -967,6 +943,60 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()>
}

state.view = view;
if state.view.is_empty() {
return Ok(())
}

let id = match state.collating_on {
Some(id) => id,
None => return Ok(()),
};

// all validators assigned to the core
// across all active leaves
// this is typically our current group
// but can also include the previous group at
// rotation boundaries and considering forks
let mut group_validators = HashSet::new();

for relay_parent in state.view.iter().cloned() {
tracing::debug!(
target: LOG_TARGET,
?relay_parent,
para_id = ?id,
"Processing relay parent.",
);

// Determine our assigned core.
// If it is not scheduled then ignore the relay parent.
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
Some(core) => core,
None => continue,
};

// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;

let validators = current_validators.validators;
group_validators.extend(validators);

state.our_validators_groups.entry(relay_parent).or_insert(ValidatorGroup::new());
}

let validators: Vec<_> = group_validators.into_iter().collect();
let no_one_is_assigned = validators.is_empty();
if no_one_is_assigned {
tracing::warn!(target: LOG_TARGET, "No validators assigned to our core.",);
return Ok(())
}
tracing::debug!(
target: LOG_TARGET,
?validators,
para_id = ?id,
"Connecting to validators.",
);
connect_to_validators(ctx, validators).await;

Ok(())
}
Expand Down
Loading

0 comments on commit 9776606

Please sign in to comment.