Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backing: improve session buffering for runtime information #6284

Merged
merged 10 commits into from
Nov 13, 2024
224 changes: 177 additions & 47 deletions polkadot/node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,154 @@ impl From<&ActiveLeafState> for ProspectiveParachainsMode {
}
}

macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(err) => {
// Only bubble up fatal errors.
error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;

// We can't do candidate validation work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(None)
},
}
};
}

/// A cache for storing data per-session to reduce repeated
/// runtime API calls and avoid redundant computations.
struct PerSessionCache {
/// Cache for storing validators list, retrieved from the runtime.
validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
/// Cache for storing node features, retrieved from the runtime.
node_features_cache: LruMap<SessionIndex, Arc<NodeFeatures>>,
/// Cache for storing the minimum backing votes threshold, retrieved from the runtime.
minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
/// Cache for storing validator-to-group mappings, computed from validator groups.
validator_to_group_cache:
LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
}

impl Default for PerSessionCache {
/// Creates a new `PerSessionCache` with a default capacity.
fn default() -> Self {
Self::new(2)
}
}

impl PerSessionCache {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we already have such a cache:

I would suggest to adjust the existing implementation to cater to the needs here. For example, usage of multiple LRUs makes sense, so subsystems don't pay for what they don't use.

/// Creates a new `PerSessionCache` with a given capacity.
fn new(capacity: u32) -> Self {
PerSessionCache {
validators_cache: LruMap::new(ByLength::new(capacity)),
node_features_cache: LruMap::new(ByLength::new(capacity)),
minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
}
}

/// Gets validators from the cache or fetches them from the runtime if not present.
async fn get_or_fetch_validators(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is a cache struct, the function names are overly verbose. I'd suggest just validators(), node_fetures(), executor_params.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

&mut self,
session_index: SessionIndex,
parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<Option<Arc<Vec<ValidatorId>>>, Error> {
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
// Try to get the validators list from the cache.
if let Some(validators) = self.validators_cache.get(&session_index) {
return Ok(Some(Arc::clone(validators)));
}

// Fetch the validators list from the runtime since it was not in the cache.
let validators = request_validators(parent, sender)
.await
.await
.map_err(Error::RuntimeApiUnavailable)?;
let validators = try_runtime_api!(validators);
let validators_arc = Arc::new(validators);

// Cache the fetched validators list for future use.
self.validators_cache.insert(session_index, Arc::clone(&validators_arc));

Ok(Some(validators_arc))
}

/// Gets the node features from the cache or fetches it from the runtime if not present.
async fn get_or_fetch_node_features(
&mut self,
session_index: SessionIndex,
parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<Option<Arc<NodeFeatures>>, Error> {
// Try to get the node features from the cache.
if let Some(node_features) = self.node_features_cache.get(&session_index) {
return Ok(Some(Arc::clone(node_features)));
}

// Fetch the node features from the runtime since it was not in the cache.
let node_features = request_node_features(parent, session_index, sender)
.await?
.unwrap_or(NodeFeatures::EMPTY);
let node_features_arc = Arc::new(node_features);

// Cache the node features for future use.
self.node_features_cache.insert(session_index, Arc::clone(&node_features_arc));

Ok(Some(node_features_arc))
}

/// Gets the minimum backing votes threshold from the
/// cache or fetches it from the runtime if not present.
async fn get_or_fetch_minimum_backing_votes(
&mut self,
session_index: SessionIndex,
parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<Option<u32>, Error> {
// Try to get the value from the cache.
if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
return Ok(Some(*minimum_backing_votes));
}

// Fetch the value from the runtime since it was not in the cache.
let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender).await;
let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);

// Cache the fetched value for future use.
self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);

Ok(Some(minimum_backing_votes))
}

/// Gets or computes the validator-to-group mapping for a session.
fn get_or_compute_validator_to_group(
&mut self,
session_index: SessionIndex,
validators: &[ValidatorId],
validator_groups: &[Vec<ValidatorIndex>],
) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
let validator_to_group = self
.validator_to_group_cache
.get_or_insert(session_index, || {
let mut vector = vec![None; validators.len()];

for (group_idx, validator_group) in validator_groups.iter().enumerate() {
for validator in validator_group {
vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
}
}

Arc::new(IndexedVec::<_, _>::from(vector))
})
.expect("Just inserted");

Arc::clone(validator_to_group)
}
}

/// The state of the subsystem.
struct State {
/// The utility for managing the implicit and explicit views in a consistent way.
Expand Down Expand Up @@ -322,9 +470,9 @@ struct State {
/// This is guaranteed to have an entry for each candidate with a relay parent in the implicit
/// or explicit view for which a `Seconded` statement has been successfully imported.
per_candidate: HashMap<CandidateHash, PerCandidateState>,
/// Cache the per-session Validator->Group mapping.
validator_to_group_cache:
LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
/// A local cache for storing per-session data. This cache helps to
/// reduce repeated calls to the runtime and avoid redundant computations.
per_session_cache: PerSessionCache,
/// A clonable sender which is dispatched to background candidate validation tasks to inform
/// the main task of the result.
background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
Expand All @@ -342,7 +490,7 @@ impl State {
per_leaf: HashMap::default(),
per_relay_parent: HashMap::default(),
per_candidate: HashMap::new(),
validator_to_group_cache: LruMap::new(ByLength::new(2)),
per_session_cache: PerSessionCache::default(),
background_validation_tx,
keystore,
}
Expand Down Expand Up @@ -984,7 +1132,7 @@ async fn handle_active_leaves_update<Context>(
ctx,
maybe_new,
&state.keystore,
&mut state.validator_to_group_cache,
&mut state.per_session_cache,
mode,
)
.await?;
Expand All @@ -997,23 +1145,6 @@ async fn handle_active_leaves_update<Context>(
Ok(())
}

macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(err) => {
// Only bubble up fatal errors.
error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;

// We can't do candidate validation work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(None)
},
}
};
}

fn core_index_from_statement(
validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
group_rotation_info: &GroupRotationInfo,
Expand Down Expand Up @@ -1084,17 +1215,13 @@ async fn construct_per_relay_parent_state<Context>(
ctx: &mut Context,
relay_parent: Hash,
keystore: &KeystorePtr,
validator_to_group_cache: &mut LruMap<
SessionIndex,
Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
>,
per_session_cache: &mut PerSessionCache,
mode: ProspectiveParachainsMode,
) -> Result<Option<PerRelayParentState>, Error> {
let parent = relay_parent;

let (session_index, validators, groups, cores) = futures::try_join!(
let (session_index, groups, cores) = futures::try_join!(
request_session_index_for_child(parent, ctx.sender()).await,
request_validators(parent, ctx.sender()).await,
request_validator_groups(parent, ctx.sender()).await,
request_from_runtime(parent, ctx.sender(), |tx| {
RuntimeApiRequest::AvailabilityCores(tx)
Expand All @@ -1105,20 +1232,31 @@ async fn construct_per_relay_parent_state<Context>(

let session_index = try_runtime_api!(session_index);

let inject_core_index = request_node_features(parent, session_index, ctx.sender())
let validators_arc = per_session_cache
.get_or_fetch_validators(session_index, parent, ctx.sender())
.await?
.unwrap();
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
let validators = validators_arc.as_ref().clone();
sw10pa marked this conversation as resolved.
Show resolved Hide resolved

let node_features_arc = per_session_cache
.get_or_fetch_node_features(session_index, parent, ctx.sender())
.await?
.unwrap_or(NodeFeatures::EMPTY)
.unwrap();

let inject_core_index = node_features_arc
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
.as_ref()
.get(FeatureIndex::ElasticScalingMVP as usize)
.map(|b| *b)
.unwrap_or(false);

gum::debug!(target: LOG_TARGET, inject_core_index, ?parent, "New state");

let validators: Vec<_> = try_runtime_api!(validators);
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
let cores = try_runtime_api!(cores);
let minimum_backing_votes =
try_runtime_api!(request_min_backing_votes(parent, session_index, ctx.sender()).await);
let minimum_backing_votes = per_session_cache
.get_or_fetch_minimum_backing_votes(session_index, parent, ctx.sender())
.await?
.unwrap();

// TODO: https://github.com/paritytech/polkadot-sdk/issues/1940
// Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this call to
Expand Down Expand Up @@ -1191,19 +1329,11 @@ async fn construct_per_relay_parent_state<Context>(
}
gum::debug!(target: LOG_TARGET, ?groups, "TableContext");

let validator_to_group = validator_to_group_cache
.get_or_insert(session_index, || {
let mut vector = vec![None; validators.len()];

for (group_idx, validator_group) in validator_groups.iter().enumerate() {
for validator in validator_group {
vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
}
}

Arc::new(IndexedVec::<_, _>::from(vector))
})
.expect("Just inserted");
let validator_to_group = per_session_cache.get_or_compute_validator_to_group(
session_index,
&validators,
&validator_groups,
);

let table_context = TableContext { validator, groups, validators, disabled_validators };
let table_config = TableConfig {
Expand All @@ -1228,7 +1358,7 @@ async fn construct_per_relay_parent_state<Context>(
inject_core_index,
n_cores: cores.len() as u32,
claim_queue: ClaimQueueSnapshot::from(claim_queue),
validator_to_group: validator_to_group.clone(),
validator_to_group,
group_rotation_info,
}))
}
Expand Down
Loading
Loading