Skip to content

Commit

Permalink
approval-voting: remove some inefficiences on startup (#3747)
Browse files Browse the repository at this point in the history
Small refactoring to reduce the algorithmic complexity of the initial
message distribution in approval voting after a sync from O(n_candidates
^ 2) to O(n_candidates).
  • Loading branch information
ordian authored Mar 21, 2024
1 parent 4842faf commit 64a707a
Showing 1 changed file with 129 additions and 145 deletions.
274 changes: 129 additions & 145 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::{
vstaging::{ApprovalVoteMultipleCandidates, ApprovalVotingParams},
BlockNumber, CandidateHash, CandidateIndex, CandidateReceipt, DisputeStatement, ExecutorParams,
GroupIndex, Hash, PvfExecKind, SessionIndex, SessionInfo, ValidDisputeStatementKind,
ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
BlockNumber, CandidateHash, CandidateIndex, CandidateReceipt, CoreIndex, DisputeStatement,
ExecutorParams, GroupIndex, Hash, PvfExecKind, SessionIndex, SessionInfo,
ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
};
use sc_keystore::LocalKeystore;
use sp_application_crypto::Pair;
Expand Down Expand Up @@ -1285,10 +1285,10 @@ fn cores_to_candidate_indices(

// Map from core index to candidate index.
for claimed_core_index in core_indices.iter_ones() {
if let Some(candidate_index) = block_entry
// Candidates are sorted by core index.
if let Ok(candidate_index) = block_entry
.candidates()
.iter()
.position(|(core_index, _)| core_index.0 == claimed_core_index as u32)
.binary_search_by_key(&(claimed_core_index as u32), |(core_index, _)| core_index.0)
{
candidate_indices.push(candidate_index as _);
}
Expand All @@ -1297,6 +1297,21 @@ fn cores_to_candidate_indices(
CandidateBitfield::try_from(candidate_indices)
}

// Returns the claimed core bitfield from the assignment cert and the core index
// from the block entry.
fn get_core_indices_on_startup(
assignment: &AssignmentCertKindV2,
block_entry_core_index: CoreIndex,
) -> CoreBitfield {
match &assignment {
AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => core_bitfield.clone(),
AssignmentCertKindV2::RelayVRFModulo { sample: _ } =>
CoreBitfield::try_from(vec![block_entry_core_index]).expect("Not an empty vec; qed"),
AssignmentCertKindV2::RelayVRFDelay { core_index } =>
CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed"),
}
}

// Returns the claimed core bitfield from the assignment cert, the candidate hash and a
// `BlockEntry`. Can fail only for VRF Delay assignments for which we cannot find the candidate hash
// in the block entry which indicates a bug or corrupted storage.
Expand Down Expand Up @@ -1367,7 +1382,7 @@ async fn distribution_messages_for_activation<Context>(
session: block_entry.session(),
});
let mut signatures_queued = HashSet::new();
for (_, candidate_hash) in block_entry.candidates() {
for (core_index, candidate_hash) in block_entry.candidates() {
let _candidate_span =
distribution_message_span.child("candidate").with_candidate(*candidate_hash);
let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
Expand All @@ -1389,152 +1404,121 @@ async fn distribution_messages_for_activation<Context>(
match approval_entry.local_statements() {
(None, None) | (None, Some(_)) => {}, // second is impossible case.
(Some(assignment), None) => {
if let Some(claimed_core_indices) = get_assignment_core_indices(
&assignment.cert().kind,
&candidate_hash,
&block_entry,
) {
if block_entry.has_candidates_pending_signature() {
delayed_approvals_timers.maybe_arm_timer(
state.clock.tick_now(),
state.clock.as_ref(),
block_entry.block_hash(),
assignment.validator_index(),
)
}
let claimed_core_indices =
get_core_indices_on_startup(&assignment.cert().kind, *core_index);

if block_entry.has_candidates_pending_signature() {
delayed_approvals_timers.maybe_arm_timer(
state.clock.tick_now(),
state.clock.as_ref(),
block_entry.block_hash(),
assignment.validator_index(),
)
}

match cores_to_candidate_indices(
&claimed_core_indices,
&block_entry,
) {
Ok(bitfield) => {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate_entry.candidate_receipt().hash(),
?block_hash,
"Discovered, triggered assignment, not approved yet",
);

let indirect_cert = IndirectAssignmentCertV2 {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
};
messages.push(
ApprovalDistributionMessage::DistributeAssignment(
indirect_cert.clone(),
bitfield.clone(),
),
);

if !block_entry
.candidate_is_pending_signature(*candidate_hash)
{
let ExtendedSessionInfo { ref executor_params, .. } =
match get_extended_session_info(
session_info_provider,
ctx.sender(),
block_entry.block_hash(),
block_entry.session(),
)
.await
{
Some(i) => i,
None => continue,
};

actions.push(Action::LaunchApproval {
claimed_candidate_indices: bitfield,
candidate_hash: candidate_entry
.candidate_receipt()
.hash(),
indirect_cert,
assignment_tranche: assignment.tranche(),
relay_block_hash: block_hash,
session: block_entry.session(),
executor_params: executor_params.clone(),
candidate: candidate_entry
.candidate_receipt()
.clone(),
backing_group: approval_entry.backing_group(),
distribute_assignment: false,
});
}
},
Err(err) => {
// Should never happen. If we fail here it means the
// assignment is null (no cores claimed).
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
?err,
"Failed to create assignment bitfield",
);
},
}
} else {
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Cannot get assignment claimed core indices",
);
match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
Ok(bitfield) => {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate_entry.candidate_receipt().hash(),
?block_hash,
"Discovered, triggered assignment, not approved yet",
);

let indirect_cert = IndirectAssignmentCertV2 {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
};
messages.push(
ApprovalDistributionMessage::DistributeAssignment(
indirect_cert.clone(),
bitfield.clone(),
),
);

if !block_entry.candidate_is_pending_signature(*candidate_hash)
{
let ExtendedSessionInfo { ref executor_params, .. } =
match get_extended_session_info(
session_info_provider,
ctx.sender(),
block_entry.block_hash(),
block_entry.session(),
)
.await
{
Some(i) => i,
None => continue,
};

actions.push(Action::LaunchApproval {
claimed_candidate_indices: bitfield,
candidate_hash: candidate_entry
.candidate_receipt()
.hash(),
indirect_cert,
assignment_tranche: assignment.tranche(),
relay_block_hash: block_hash,
session: block_entry.session(),
executor_params: executor_params.clone(),
candidate: candidate_entry.candidate_receipt().clone(),
backing_group: approval_entry.backing_group(),
distribute_assignment: false,
});
}
},
Err(err) => {
// Should never happen. If we fail here it means the
// assignment is null (no cores claimed).
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
?err,
"Failed to create assignment bitfield",
);
},
}
},
(Some(assignment), Some(approval_sig)) => {
if let Some(claimed_core_indices) = get_assignment_core_indices(
&assignment.cert().kind,
&candidate_hash,
&block_entry,
) {
match cores_to_candidate_indices(
&claimed_core_indices,
&block_entry,
) {
Ok(bitfield) => messages.push(
ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCertV2 {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
bitfield,
),
),
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
?err,
"Failed to create assignment bitfield",
);
// If we didn't send assignment, we don't send approval.
continue
},
}
if signatures_queued
.insert(approval_sig.signed_candidates_indices.clone())
{
messages.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVoteV2 {
let claimed_core_indices =
get_core_indices_on_startup(&assignment.cert().kind, *core_index);
match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
Ok(bitfield) => messages.push(
ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCertV2 {
block_hash,
candidate_indices: approval_sig
.signed_candidates_indices,
validator: assignment.validator_index(),
signature: approval_sig.signature,
cert: assignment.cert().clone(),
},
))
};
} else {
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Cannot get assignment claimed core indices",
);
bitfield,
),
),
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
?err,
"Failed to create assignment bitfield",
);
// If we didn't send assignment, we don't send approval.
continue
},
}
if signatures_queued
.insert(approval_sig.signed_candidates_indices.clone())
{
messages.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVoteV2 {
block_hash,
candidate_indices: approval_sig.signed_candidates_indices,
validator: assignment.validator_index(),
signature: approval_sig.signature,
},
))
};
},
}
},
Expand Down

0 comments on commit 64a707a

Please sign in to comment.