Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collation-generation + collator-protocol: collate on multiple assigned cores #3795

Merged
merged 31 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8f5d259
DistributeCollation includes CoreIndex
sandreim Mar 19, 2024
dc84796
SubmitCollationParams includes CoreIndex
sandreim Mar 19, 2024
665ef06
collator-protocol updates
sandreim Mar 19, 2024
78ee8b2
collation-generation pull support for chained collations
sandreim Mar 19, 2024
a61d64a
adjust validator buffer capacity for elastic scaling
sandreim Mar 20, 2024
1cbb090
fmt
sandreim Mar 20, 2024
493a7a2
Per core index tracking of multiple collations
sandreim Mar 20, 2024
8497413
fix tests
sandreim Mar 20, 2024
0dbc63a
enable collator-protocol elastic scaling extension for adder colaltor
sandreim Mar 22, 2024
3906d34
Use backing state to get candidates pending availability
sandreim Mar 22, 2024
eb7d842
Merge branch 'master' of github.com:paritytech/polkadot-sdk into sand…
sandreim Mar 22, 2024
d344a2d
use ok_or
sandreim Mar 22, 2024
16e1cb4
subsystem-util request_para_backing_state
sandreim Mar 22, 2024
91f9c40
Use the exposed claim queue
sandreim Mar 22, 2024
154f3dd
refactor
sandreim Mar 22, 2024
60ec79f
remove type annotation
sandreim Mar 22, 2024
a6c4afe
remove more merge damage
sandreim Mar 22, 2024
62c13dc
make tests compile
sandreim Mar 22, 2024
ddf057e
happy clippy is good clippy
sandreim Mar 22, 2024
13b149f
lookahead use core_index in SubmitCollationParams
sandreim Mar 25, 2024
64a53f4
taplo fix
sandreim Mar 25, 2024
732440b
enable elastic scaling in undying collator (also enables in zombienet…
sandreim Mar 25, 2024
f80897d
Collation generation test fixes and new tests
sandreim Mar 25, 2024
7d8e6b5
Merge branch 'master' of github.com:paritytech/polkadot-sdk into sand…
sandreim Mar 25, 2024
83af4e1
bring test-runtime up to date
sandreim Mar 25, 2024
9721b69
review feedback
sandreim Mar 25, 2024
4eb804e
await await
sandreim Mar 25, 2024
8b70bc2
review feedback
sandreim Mar 27, 2024
16607dd
prdoc
sandreim Mar 27, 2024
ba80669
Merge branch 'master' of github.com:paritytech/polkadot-sdk into sand…
sandreim Mar 27, 2024
69a63cc
nice new syntax
sandreim Mar 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions cumulus/client/consensus/aura/src/collators/lookahead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use polkadot_node_subsystem::messages::{
CollationGenerationMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption};
use polkadot_primitives::{CollatorPair, CoreIndex, Id as ParaId, OccupiedCoreAssumption};

use futures::{channel::oneshot, prelude::*};
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
Expand Down Expand Up @@ -184,7 +184,15 @@ where
while let Some(relay_parent_header) = import_notifications.next().await {
let relay_parent = relay_parent_header.hash();

if !is_para_scheduled(relay_parent, params.para_id, &mut params.overseer_handle).await {
let core_index = if let Some(core_index) = fist_core_scheduled_for_para(
sandreim marked this conversation as resolved.
Show resolved Hide resolved
relay_parent,
params.para_id,
&mut params.overseer_handle,
)
.await
{
core_index
} else {
tracing::trace!(
target: crate::LOG_TARGET,
?relay_parent,
Expand All @@ -193,7 +201,7 @@ where
);

continue
}
};

let max_pov_size = match params
.relay_client
Expand Down Expand Up @@ -396,6 +404,7 @@ where
parent_head: parent_header.encode().into(),
validation_code_hash,
result_sender: None,
core_index,
},
),
"SubmitCollation",
Expand Down Expand Up @@ -480,14 +489,12 @@ async fn max_ancestry_lookback(
}
}

// Checks if there exists a scheduled core for the para at the provided relay parent.
//
// Falls back to `false` in case of an error.
async fn is_para_scheduled(
// Checks the first `CoreIndex` assigned to the para at the provided relay parent.
sandreim marked this conversation as resolved.
Show resolved Hide resolved
async fn fist_core_scheduled_for_para(
relay_parent: PHash,
para_id: ParaId,
overseer_handle: &mut OverseerHandle,
) -> bool {
) -> Option<CoreIndex> {
let (tx, rx) = oneshot::channel();
let request = RuntimeApiRequest::AvailabilityCores(tx);
overseer_handle
Expand All @@ -503,17 +510,20 @@ async fn is_para_scheduled(
?relay_parent,
"Failed to query availability cores runtime API",
);
return false
return None
},
Err(oneshot::Canceled) => {
tracing::error!(
target: crate::LOG_TARGET,
?relay_parent,
"Sender for availability cores runtime request dropped",
);
return false
return None
},
};

cores.iter().any(|core| core.para_id() == Some(para_id))
cores
.iter()
.position(|core| core.para_id() == Some(para_id))
.map(|index| CoreIndex(index as _))
}
2 changes: 2 additions & 0 deletions polkadot/node/collation-generation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub enum Error {
Util(#[from] polkadot_node_subsystem_util::Error),
#[error(transparent)]
Erasure(#[from] polkadot_erasure_coding::Error),
#[error("Parachain backing state not available in runtime.")]
MissingParaBackingState,
}

pub type Result<T> = std::result::Result<T, Error>;
182 changes: 108 additions & 74 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use polkadot_node_subsystem::{
};
use polkadot_node_subsystem_util::{
has_required_runtime, request_async_backing_params, request_availability_cores,
request_claim_queue, request_persisted_validation_data, request_validation_code,
request_validation_code_hash, request_validators,
request_claim_queue, request_para_backing_state, request_persisted_validation_data,
request_validation_code, request_validation_code_hash, request_validators,
};
use polkadot_primitives::{
collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
Expand Down Expand Up @@ -212,38 +212,41 @@ async fn handle_new_activations<Context>(
if config.collator.is_none() {
return Ok(())
}
let para_id = config.para_id;

let _overall_timer = metrics.time_new_activations();

for relay_parent in activated {
let _relay_parent_timer = metrics.time_new_activations_relay_parent();

let (availability_cores, validators, async_backing_params) = join!(
let (availability_cores, validators, async_backing_params, para_backing_state) = join!(
request_availability_cores(relay_parent, ctx.sender()).await,
request_validators(relay_parent, ctx.sender()).await,
request_async_backing_params(relay_parent, ctx.sender()).await,
request_para_backing_state(relay_parent, config.para_id, ctx.sender()).await,
sandreim marked this conversation as resolved.
Show resolved Hide resolved
);

let availability_cores = availability_cores??;
let n_validators = validators??.len();
let async_backing_params = async_backing_params?.ok();
let n_validators = validators??.len();
let para_backing_state =
para_backing_state??.ok_or(crate::error::Error::MissingParaBackingState)?;

let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?;

for (core_idx, core) in availability_cores.into_iter().enumerate() {
let _availability_core_timer = metrics.time_new_activations_availability_core();
// The loop bellow will fill in cores that the para is allowed to build on.
let mut cores_to_build_on = Vec::new();

let (scheduled_core, assumption) = match core {
CoreState::Scheduled(scheduled_core) =>
(scheduled_core, OccupiedCoreAssumption::Free),
for (core_idx, core) in availability_cores.into_iter().enumerate() {
let scheduled_core = match core {
CoreState::Scheduled(scheduled_core) => scheduled_core,
CoreState::Occupied(occupied_core) => match async_backing_params {
Some(params) if params.max_candidate_depth >= 1 => {
// maximum candidate depth when building on top of a block
// pending availability is necessarily 1 - the depth of the
// pending block is 0 so the child has depth 1.

// TODO [now]: this assumes that next up == current.
// in practice we should only set `OccupiedCoreAssumption::Included`
// when the candidate occupying the core is also of the same para.
// Use claim queue if available, or fallback to `next_up_on_available`
let res = match maybe_claim_queue {
Some(ref claim_queue) => {
// read what's in the claim queue for this core
Expand All @@ -257,8 +260,7 @@ async fn handle_new_activations<Context>(
// `next_up_on_available`
occupied_core.next_up_on_available
},
}
.map(|scheduled| (scheduled, OccupiedCoreAssumption::Included));
};

match res {
Some(res) => res,
Expand All @@ -279,7 +281,7 @@ async fn handle_new_activations<Context>(
gum::trace!(
target: LOG_TARGET,
core_idx = %core_idx,
"core is free. Keep going.",
"core is not assigned to any para. Keep going.",
);
continue
},
Expand All @@ -297,61 +299,82 @@ async fn handle_new_activations<Context>(
continue
}

// we get validation data and validation code synchronously for each core instead of
// within the subtask loop, because we have only a single mutable handle to the
// context, so the work can't really be distributed

let validation_data = match request_persisted_validation_data(
relay_parent,
scheduled_core.para_id,
assumption,
ctx.sender(),
)
.await
.await??
{
Some(v) => v,
None => {
gum::trace!(
target: LOG_TARGET,
core_idx = %core_idx,
relay_parent = ?relay_parent,
our_para = %config.para_id,
their_para = %scheduled_core.para_id,
"validation data is not available",
);
continue
},
};
// Accumulate cores for building collation(s) outside the loop.
cores_to_build_on.push(CoreIndex(core_idx as u32));
sandreim marked this conversation as resolved.
Show resolved Hide resolved
}

let validation_code_hash = match obtain_validation_code_hash_with_assumption(
relay_parent,
scheduled_core.para_id,
assumption,
ctx.sender(),
)
.await?
{
Some(v) => v,
None => {
gum::trace!(
target: LOG_TARGET,
core_idx = %core_idx,
relay_parent = ?relay_parent,
our_para = %config.para_id,
their_para = %scheduled_core.para_id,
"validation code hash is not found.",
);
continue
},
};
// Skip to next relay parent if there is no core assigned to us.
if cores_to_build_on.is_empty() {
continue
}

let task_config = config.clone();
let metrics = metrics.clone();
let mut task_sender = ctx.sender().clone();
ctx.spawn(
"collation-builder",
Box::pin(async move {
// We are being very optimistic here, but one of the cores could pend availability some more
// block, ore even time out.
// For timeout assumption the collator can't really know because it doesn't receive bitfield
// gossip.
let assumption = if para_backing_state.pending_availability.is_empty() {
OccupiedCoreAssumption::Free
} else {
OccupiedCoreAssumption::Included
};

gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
our_para = %config.para_id,
?assumption,
"Occupied core(s) assumption",
);

let mut validation_data = match request_persisted_validation_data(
relay_parent,
config.para_id,
assumption,
ctx.sender(),
)
.await
.await??
{
Some(v) => v,
None => {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
our_para = %config.para_id,
"validation data is not available",
);
continue
},
};

let validation_code_hash = match obtain_validation_code_hash_with_assumption(
relay_parent,
config.para_id,
assumption,
ctx.sender(),
)
.await?
{
Some(v) => v,
None => {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
our_para = %config.para_id,
"validation code hash is not found.",
);
continue
},
};

let task_config = config.clone();
let metrics = metrics.clone();
let mut task_sender = ctx.sender().clone();

ctx.spawn(
"chained-collation-builder",
Box::pin(async move {
for core_index in cores_to_build_on {
let collator_fn = match task_config.collator.as_ref() {
Some(x) => x,
None => return,
Expand All @@ -363,31 +386,37 @@ async fn handle_new_activations<Context>(
None => {
gum::debug!(
target: LOG_TARGET,
para_id = %scheduled_core.para_id,
?para_id,
"collator returned no collation on collate",
);
return
},
};

let parent_head = collation.head_data.clone();
construct_and_distribute_receipt(
PreparedCollation {
collation,
para_id: scheduled_core.para_id,
para_id,
relay_parent,
validation_data,
validation_data: validation_data.clone(),
validation_code_hash,
n_validators,
core_index,
},
task_config.key.clone(),
&mut task_sender,
result_sender,
&metrics,
)
.await;
}),
)?;
}

// Chain the collations. All else stays the same as we build the chained
// collation on same relay parent.
validation_data.parent_head = parent_head;
}
}),
)?;
}

Ok(())
Expand All @@ -408,6 +437,7 @@ async fn handle_submit_collation<Context>(
parent_head,
validation_code_hash,
result_sender,
core_index,
} = params;

let validators = request_validators(relay_parent, ctx.sender()).await.await??;
Expand Down Expand Up @@ -444,6 +474,7 @@ async fn handle_submit_collation<Context>(
validation_data,
validation_code_hash,
n_validators,
core_index,
};

construct_and_distribute_receipt(
Expand All @@ -465,6 +496,7 @@ struct PreparedCollation {
validation_data: PersistedValidationData,
validation_code_hash: ValidationCodeHash,
n_validators: usize,
core_index: CoreIndex,
}

/// Takes a prepared collation, along with its context, and produces a candidate receipt
Expand All @@ -483,6 +515,7 @@ async fn construct_and_distribute_receipt(
validation_data,
validation_code_hash,
n_validators,
core_index,
} = collation;

let persisted_validation_data_hash = validation_data.hash();
Expand Down Expand Up @@ -578,6 +611,7 @@ async fn construct_and_distribute_receipt(
pov,
parent_head_data,
result_sender,
core_index,
})
.await;
}
Expand Down
Loading
Loading