Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

approval-voting: remove some inefficiences on startup #3747

Merged
merged 1 commit into from
Mar 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 129 additions & 145 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::{
vstaging::{ApprovalVoteMultipleCandidates, ApprovalVotingParams},
BlockNumber, CandidateHash, CandidateIndex, CandidateReceipt, DisputeStatement, ExecutorParams,
GroupIndex, Hash, PvfExecKind, SessionIndex, SessionInfo, ValidDisputeStatementKind,
ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
BlockNumber, CandidateHash, CandidateIndex, CandidateReceipt, CoreIndex, DisputeStatement,
ExecutorParams, GroupIndex, Hash, PvfExecKind, SessionIndex, SessionInfo,
ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
};
use sc_keystore::LocalKeystore;
use sp_application_crypto::Pair;
Expand Down Expand Up @@ -1285,10 +1285,10 @@ fn cores_to_candidate_indices(

// Map from core index to candidate index.
for claimed_core_index in core_indices.iter_ones() {
if let Some(candidate_index) = block_entry
// Candidates are sorted by core index.
if let Ok(candidate_index) = block_entry
.candidates()
.iter()
.position(|(core_index, _)| core_index.0 == claimed_core_index as u32)
.binary_search_by_key(&(claimed_core_index as u32), |(core_index, _)| core_index.0)
{
candidate_indices.push(candidate_index as _);
}
Expand All @@ -1297,6 +1297,21 @@ fn cores_to_candidate_indices(
CandidateBitfield::try_from(candidate_indices)
}

// Returns the claimed core bitfield from the assignment cert and the core index
// from the block entry.
fn get_core_indices_on_startup(
assignment: &AssignmentCertKindV2,
block_entry_core_index: CoreIndex,
) -> CoreBitfield {
match &assignment {
AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => core_bitfield.clone(),
AssignmentCertKindV2::RelayVRFModulo { sample: _ } =>
CoreBitfield::try_from(vec![block_entry_core_index]).expect("Not an empty vec; qed"),
AssignmentCertKindV2::RelayVRFDelay { core_index } =>
CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed"),
}
}

// Returns the claimed core bitfield from the assignment cert, the candidate hash and a
// `BlockEntry`. Can fail only for VRF Delay assignments for which we cannot find the candidate hash
// in the block entry which indicates a bug or corrupted storage.
Expand Down Expand Up @@ -1367,7 +1382,7 @@ async fn distribution_messages_for_activation<Context>(
session: block_entry.session(),
});
let mut signatures_queued = HashSet::new();
for (_, candidate_hash) in block_entry.candidates() {
for (core_index, candidate_hash) in block_entry.candidates() {
let _candidate_span =
distribution_message_span.child("candidate").with_candidate(*candidate_hash);
let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
Expand All @@ -1389,152 +1404,121 @@ async fn distribution_messages_for_activation<Context>(
match approval_entry.local_statements() {
(None, None) | (None, Some(_)) => {}, // second is impossible case.
(Some(assignment), None) => {
if let Some(claimed_core_indices) = get_assignment_core_indices(
&assignment.cert().kind,
&candidate_hash,
&block_entry,
) {
if block_entry.has_candidates_pending_signature() {
delayed_approvals_timers.maybe_arm_timer(
state.clock.tick_now(),
state.clock.as_ref(),
block_entry.block_hash(),
assignment.validator_index(),
)
}
let claimed_core_indices =
get_core_indices_on_startup(&assignment.cert().kind, *core_index);

if block_entry.has_candidates_pending_signature() {
delayed_approvals_timers.maybe_arm_timer(
state.clock.tick_now(),
state.clock.as_ref(),
block_entry.block_hash(),
assignment.validator_index(),
)
}

match cores_to_candidate_indices(
&claimed_core_indices,
&block_entry,
) {
Ok(bitfield) => {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate_entry.candidate_receipt().hash(),
?block_hash,
"Discovered, triggered assignment, not approved yet",
);

let indirect_cert = IndirectAssignmentCertV2 {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
};
messages.push(
ApprovalDistributionMessage::DistributeAssignment(
indirect_cert.clone(),
bitfield.clone(),
),
);

if !block_entry
.candidate_is_pending_signature(*candidate_hash)
{
let ExtendedSessionInfo { ref executor_params, .. } =
match get_extended_session_info(
session_info_provider,
ctx.sender(),
block_entry.block_hash(),
block_entry.session(),
)
.await
{
Some(i) => i,
None => continue,
};

actions.push(Action::LaunchApproval {
claimed_candidate_indices: bitfield,
candidate_hash: candidate_entry
.candidate_receipt()
.hash(),
indirect_cert,
assignment_tranche: assignment.tranche(),
relay_block_hash: block_hash,
session: block_entry.session(),
executor_params: executor_params.clone(),
candidate: candidate_entry
.candidate_receipt()
.clone(),
backing_group: approval_entry.backing_group(),
distribute_assignment: false,
});
}
},
Err(err) => {
// Should never happen. If we fail here it means the
// assignment is null (no cores claimed).
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
?err,
"Failed to create assignment bitfield",
);
},
}
} else {
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Cannot get assignment claimed core indices",
);
match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
Ok(bitfield) => {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate_entry.candidate_receipt().hash(),
?block_hash,
"Discovered, triggered assignment, not approved yet",
);

let indirect_cert = IndirectAssignmentCertV2 {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
};
messages.push(
ApprovalDistributionMessage::DistributeAssignment(
indirect_cert.clone(),
bitfield.clone(),
),
);

if !block_entry.candidate_is_pending_signature(*candidate_hash)
{
let ExtendedSessionInfo { ref executor_params, .. } =
match get_extended_session_info(
session_info_provider,
ctx.sender(),
block_entry.block_hash(),
block_entry.session(),
)
.await
{
Some(i) => i,
None => continue,
};

actions.push(Action::LaunchApproval {
claimed_candidate_indices: bitfield,
candidate_hash: candidate_entry
.candidate_receipt()
.hash(),
indirect_cert,
assignment_tranche: assignment.tranche(),
relay_block_hash: block_hash,
session: block_entry.session(),
executor_params: executor_params.clone(),
candidate: candidate_entry.candidate_receipt().clone(),
backing_group: approval_entry.backing_group(),
distribute_assignment: false,
});
}
},
Err(err) => {
// Should never happen. If we fail here it means the
// assignment is null (no cores claimed).
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
?err,
"Failed to create assignment bitfield",
);
},
}
},
(Some(assignment), Some(approval_sig)) => {
if let Some(claimed_core_indices) = get_assignment_core_indices(
&assignment.cert().kind,
&candidate_hash,
&block_entry,
) {
match cores_to_candidate_indices(
&claimed_core_indices,
&block_entry,
) {
Ok(bitfield) => messages.push(
ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCertV2 {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
bitfield,
),
),
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
?err,
"Failed to create assignment bitfield",
);
// If we didn't send assignment, we don't send approval.
continue
},
}
if signatures_queued
.insert(approval_sig.signed_candidates_indices.clone())
{
messages.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVoteV2 {
let claimed_core_indices =
get_core_indices_on_startup(&assignment.cert().kind, *core_index);
match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
Ok(bitfield) => messages.push(
ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCertV2 {
block_hash,
candidate_indices: approval_sig
.signed_candidates_indices,
validator: assignment.validator_index(),
signature: approval_sig.signature,
cert: assignment.cert().clone(),
},
))
};
} else {
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Cannot get assignment claimed core indices",
);
bitfield,
),
),
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
?err,
"Failed to create assignment bitfield",
);
// If we didn't send assignment, we don't send approval.
continue
},
}
if signatures_queued
.insert(approval_sig.signed_candidates_indices.clone())
{
messages.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVoteV2 {
block_hash,
candidate_indices: approval_sig.signed_candidates_indices,
validator: assignment.validator_index(),
signature: approval_sig.signature,
},
))
};
},
}
},
Expand Down
Loading