Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

provisioner: async backing changes #5711

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions node/core/provisioner/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ pub enum Error {
#[error("failed to get votes on dispute")]
CanceledCandidateVotes(#[source] oneshot::Canceled),

#[error("failed to get backable candidate")]
slumber marked this conversation as resolved.
Show resolved Hide resolved
CanceledProspectiveCandidateChild(#[source] oneshot::Canceled),

#[error("failed to get Runtime API version")]
CanceledRuntimeApiVersion(#[source] oneshot::Canceled),

#[error(transparent)]
ChainApi(#[from] ChainApiError),

Expand Down
228 changes: 206 additions & 22 deletions node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,27 @@

use bitvec::vec::BitVec;
use futures::{
channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered, FutureExt,
channel::oneshot, future::BoxFuture, lock::Mutex, prelude::*, stream::FuturesUnordered,
FutureExt,
};
use futures_timer::Delay;

use polkadot_node_primitives::CandidateVotes;
use polkadot_node_subsystem::{
jaeger,
messages::{
CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage, ProvisionableData,
ProvisionerInherentData, ProvisionerMessage,
CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage,
ProspectiveParachainsMessage, ProvisionableData, ProvisionerInherentData,
ProvisionerMessage, RuntimeApiMessage, RuntimeApiRequest,
},
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, LeafStatus, OverseerSignal,
PerLeafSpan, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_util::{request_availability_cores, request_persisted_validation_data};
use polkadot_primitives::v2::{
BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeState,
DisputeStatement, DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption,
SessionIndex, SignedAvailabilityBitfield, ValidatorIndex,
DisputeStatement, DisputeStatementSet, Hash, Id as ParaId, MultiDisputeStatementSet,
OccupiedCoreAssumption, SessionIndex, SignedAvailabilityBitfield, ValidatorIndex,
};
use std::collections::{BTreeMap, HashMap, HashSet};

Expand Down Expand Up @@ -70,23 +72,34 @@ impl ProvisionerSubsystem {
}
}

#[derive(Debug, Clone)]
enum ProspectiveParachainsMode {
Enabled,
Disabled {
// Without prospective parachains it's necessary
// to track backed candidates to choose from when assembling
// a relay chain block.
backed_candidates: Vec<CandidateReceipt>,
},
}

/// A per-relay-parent state for the provisioning subsystem.
pub struct PerRelayParent {
leaf: ActivatedLeaf,
backed_candidates: Vec<CandidateReceipt>,
prospective_parachains_mode: ProspectiveParachainsMode,
signed_bitfields: Vec<SignedAvailabilityBitfield>,
is_inherent_ready: bool,
awaiting_inherent: Vec<oneshot::Sender<ProvisionerInherentData>>,
span: PerLeafSpan,
}

impl PerRelayParent {
fn new(leaf: ActivatedLeaf) -> Self {
fn new(leaf: ActivatedLeaf, prospective_parachains_mode: ProspectiveParachainsMode) -> Self {
let span = PerLeafSpan::new(leaf.span.clone(), "provisioner");

Self {
leaf,
backed_candidates: Vec::new(),
prospective_parachains_mode,
signed_bitfields: Vec::new(),
is_inherent_ready: false,
awaiting_inherent: Vec::new(),
Expand Down Expand Up @@ -141,7 +154,7 @@ async fn run_iteration<Context>(
from_overseer = ctx.recv().fuse() => {
match from_overseer? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) =>
handle_active_leaves_update(update, per_relay_parent, inherent_delays),
handle_active_leaves_update(ctx.sender(), update, per_relay_parent, inherent_delays).await?,
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { msg } => {
Expand All @@ -163,20 +176,55 @@ async fn run_iteration<Context>(
}
}

fn handle_active_leaves_update(
async fn prospective_parachains_mode(
sender: &mut impl overseer::ProvisionerSenderTrait,
leaf_hash: Hash,
) -> Result<ProspectiveParachainsMode, Error> {
// TODO: call a Runtime API once staging version is available
// https://github.com/paritytech/substrate/discussions/11338
//
// Implementation should probably be shared with backing.

let (tx, rx) = oneshot::channel();
sender
.send_message(RuntimeApiMessage::Request(leaf_hash, RuntimeApiRequest::Version(tx)))
.await;

let version = rx.await.map_err(Error::CanceledRuntimeApiVersion)?.map_err(Error::Runtime)?;

if version == 3 {
Ok(ProspectiveParachainsMode::Enabled)
} else {
if version != 2 {
gum::warn!(
target: LOG_TARGET,
"Runtime API version is {}, expected 2 or 3. Prospective parachains are disabled",
version
);
}
Ok(ProspectiveParachainsMode::Disabled { backed_candidates: Vec::new() })
}
Comment on lines +194 to +205
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure how is this supposed to work, for v1 you get a warning, then for 2 you don't but this is still disabled. The comments say a different story than the intention of the code. Trying to simplify, is this what we want ?

Suggested change
if version == 3 {
Ok(ProspectiveParachainsMode::Enabled)
} else {
if version != 2 {
gum::warn!(
target: LOG_TARGET,
"Runtime API version is {}, expected 2 or 3. Prospective parachains are disabled",
version
);
}
Ok(ProspectiveParachainsMode::Disabled { backed_candidates: Vec::new() })
}
if version > 2 {
Ok(ProspectiveParachainsMode::Enabled)
} else {
gum::warn!(
target: LOG_TARGET,
"Runtime API version is {}, expected 3+. Prospective parachains are disabled",
version
);
Ok(ProspectiveParachainsMode::Disabled { backed_candidates: Vec::new() })
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We simply do not expect anything not equal to 2 or 3, this is by design.
Runtime API version is a temporary solution

}

async fn handle_active_leaves_update(
sender: &mut impl overseer::ProvisionerSenderTrait,
update: ActiveLeavesUpdate,
per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
inherent_delays: &mut InherentDelays,
) {
) -> Result<(), Error> {
for deactivated in &update.deactivated {
per_relay_parent.remove(deactivated);
}

for leaf in update.activated {
let prospective_parachains_mode = prospective_parachains_mode(sender, leaf.hash).await?;

let delay_fut = Delay::new(PRE_PROPOSE_TIMEOUT).map(move |_| leaf.hash).boxed();
per_relay_parent.insert(leaf.hash, PerRelayParent::new(leaf));
per_relay_parent.insert(leaf.hash, PerRelayParent::new(leaf, prospective_parachains_mode));
inherent_delays.push(delay_fut);
}

Ok(())
}

#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
Expand Down Expand Up @@ -219,7 +267,7 @@ async fn send_inherent_data_bg<Context>(
) -> Result<(), Error> {
let leaf = per_relay_parent.leaf.clone();
let signed_bitfields = per_relay_parent.signed_bitfields.clone();
let backed_candidates = per_relay_parent.backed_candidates.clone();
let prospective_parachains_mode = per_relay_parent.prospective_parachains_mode.clone();
let span = per_relay_parent.span.child("req-inherent-data");

let mut sender = ctx.sender().clone();
Expand All @@ -231,7 +279,7 @@ async fn send_inherent_data_bg<Context>(
if let Err(err) = send_inherent_data(
&leaf,
&signed_bitfields,
&backed_candidates,
&prospective_parachains_mode,
return_senders,
&mut sender,
&metrics,
Expand All @@ -245,7 +293,6 @@ async fn send_inherent_data_bg<Context>(
gum::debug!(
target: LOG_TARGET,
signed_bitfield_count = signed_bitfields.len(),
backed_candidates_count = backed_candidates.len(),
leaf_hash = ?leaf.hash,
"inherent data sent successfully"
);
Expand Down Expand Up @@ -279,7 +326,11 @@ fn note_provisionable_data(
.child("provisionable-backed")
.with_candidate(candidate_hash)
.with_para_id(backed_candidate.descriptor().para_id);
per_relay_parent.backed_candidates.push(backed_candidate)
if let ProspectiveParachainsMode::Disabled { backed_candidates } =
&mut per_relay_parent.prospective_parachains_mode
{
backed_candidates.push(backed_candidate)
}
},
_ => {},
}
Expand Down Expand Up @@ -307,7 +358,7 @@ type CoreAvailability = BitVec<u8, bitvec::order::Lsb0>;
async fn send_inherent_data(
leaf: &ActivatedLeaf,
bitfields: &[SignedAvailabilityBitfield],
candidates: &[CandidateReceipt],
prospective_parachains_mode: &ProspectiveParachainsMode,
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
from_job: &mut impl overseer::ProvisionerSenderTrait,
metrics: &Metrics,
Expand All @@ -326,8 +377,14 @@ async fn send_inherent_data(
select_availability_bitfields(&availability_cores, bitfields, &leaf.hash),
LeafStatus::Stale => Vec::new(),
};
let candidates =
select_candidates(&availability_cores, &bitfields, candidates, leaf.hash, from_job).await?;
let candidates = select_candidates(
&availability_cores,
&bitfields,
prospective_parachains_mode,
leaf.hash,
from_job,
)
.await?;

gum::debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -422,14 +479,16 @@ fn select_availability_bitfields(
selected.into_iter().map(|(_, b)| b).collect()
}

/// Determine which cores are free, and then to the degree possible, pick a candidate appropriate to each free core.
async fn select_candidates(
/// Selects candidates from tracked ones to note in a relay chain block.
///
/// Should be called when prospective parachains are disabled.
async fn select_candidate_hashes_from_tracked(
availability_cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
candidates: &[CandidateReceipt],
relay_parent: Hash,
sender: &mut impl overseer::ProvisionerSenderTrait,
) -> Result<Vec<BackedCandidate>, Error> {
) -> Result<Vec<CandidateHash>, Error> {
let block_number = get_block_number_under_construction(relay_parent, sender).await?;

let mut selected_candidates =
Expand Down Expand Up @@ -503,6 +562,108 @@ async fn select_candidates(
}
}

Ok(selected_candidates)
}

/// Requests backable candidates from Prospective Parachains subsystem
/// based on core states.
///
/// Should be called when prospective parachains are enabled.
async fn request_backable_candidates(
availability_cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
relay_parent: Hash,
sender: &mut impl overseer::ProvisionerSenderTrait,
) -> Result<Vec<CandidateHash>, Error> {
let block_number = get_block_number_under_construction(relay_parent, sender).await?;

// Wrapped sender is shared among concurrent prospective parachains requests.
let wrapped_sender = Mutex::new(sender);
let mut selected_candidates = Vec::with_capacity(availability_cores.len());

for (core_idx, core) in availability_cores.iter().enumerate() {
let (para_id, required_path) = match core {
CoreState::Scheduled(scheduled_core) => {
// The core is free, pick the first eligible candidate from
// the fragment tree.
(scheduled_core.para_id, Vec::new())
},
CoreState::Occupied(occupied_core) => {
if bitfields_indicate_availability(core_idx, bitfields, &occupied_core.availability)
{
if let Some(ref scheduled_core) = occupied_core.next_up_on_available {
// The candidate occupying the core is available, choose its
// child in the fragment tree.
//
// TODO: doesn't work for parathreads. We lean hard on the assumption
// that cores are fixed to specific parachains within a session.
// https://github.com/paritytech/polkadot/issues/5492
(scheduled_core.para_id, vec![occupied_core.candidate_hash])
} else {
continue
}
} else {
if occupied_core.time_out_at != block_number {
continue
}
if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out {
// Candidate's availability timed out, practically same as scheduled.
(scheduled_core.para_id, Vec::new())
} else {
continue
}
}
},
CoreState::Free => continue,
};

let fut = get_backable_candidate(relay_parent, para_id, required_path, &wrapped_sender);

selected_candidates.push(fut);
}

let selected_candidates = futures::future::try_join_all(selected_candidates).await?;
slumber marked this conversation as resolved.
Show resolved Hide resolved
let mut selected = Vec::with_capacity(selected_candidates.len());

for (core_idx, candidate_hash) in selected_candidates.into_iter().enumerate() {
match candidate_hash {
Some(hash) => selected.push(hash),
None => {
gum::debug!(
target: LOG_TARGET,
leaf_hash = ?relay_parent,
core = core_idx,
"No backable candidate returned by prospective parachains",
);
},
}
}

Ok(selected)
}

/// Determine which cores are free, and then to the degree possible, pick a candidate appropriate to each free core.
async fn select_candidates(
availability_cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
prospective_parachains_mode: &ProspectiveParachainsMode,
relay_parent: Hash,
sender: &mut impl overseer::ProvisionerSenderTrait,
) -> Result<Vec<BackedCandidate>, Error> {
let selected_candidates = match prospective_parachains_mode {
ProspectiveParachainsMode::Enabled =>
request_backable_candidates(availability_cores, bitfields, relay_parent, sender).await?,
ProspectiveParachainsMode::Disabled { backed_candidates } =>
select_candidate_hashes_from_tracked(
availability_cores,
bitfields,
&backed_candidates,
relay_parent,
sender,
)
.await?,
};

// now get the backed candidates corresponding to these candidate receipts
let (tx, rx) = oneshot::channel();
sender.send_unbounded_message(CandidateBackingMessage::GetBackedCandidates(
Expand Down Expand Up @@ -571,6 +732,29 @@ async fn get_block_number_under_construction(
}
}

/// Requests backable candidate from Prospective Parachains based on
/// the given path in the fragment tree.
async fn get_backable_candidate(
relay_parent: Hash,
para_id: ParaId,
required_path: Vec<CandidateHash>,
sender: &Mutex<&mut impl overseer::ProvisionerSenderTrait>,
) -> Result<Option<CandidateHash>, Error> {
let (tx, rx) = oneshot::channel();
sender
.lock()
.await
.send_message(ProspectiveParachainsMessage::GetBackableCandidate(
relay_parent,
para_id,
required_path,
tx,
))
.await;

rx.await.map_err(Error::CanceledProspectiveCandidateChild)
}

/// The availability bitfield for a given core is the transpose
/// of a set of signed availability bitfields. It goes like this:
///
Expand Down
Loading