Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Prefer fetching small PoVs from backing group (#7173)
Browse files Browse the repository at this point in the history
* impl QueryChunkSize

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* QueryChunkSize message

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* enable fetching from backing group for small pov

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* review feedback

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Refactor `bypass_availability_store`

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* review feedback

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
  • Loading branch information
sandreim authored May 5, 2023
1 parent 0c9143e commit f046636
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 18 deletions.
19 changes: 19 additions & 0 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,25 @@ fn process_message(
let _ =
tx.send(load_chunk(&subsystem.db, &subsystem.config, &candidate, validator_index)?);
},
AvailabilityStoreMessage::QueryChunkSize(candidate, tx) => {
let meta = load_meta(&subsystem.db, &subsystem.config, &candidate)?;

let validator_index = meta.map_or(None, |meta| meta.chunks_stored.first_one());

let maybe_chunk_size = if let Some(validator_index) = validator_index {
load_chunk(
&subsystem.db,
&subsystem.config,
&candidate,
ValidatorIndex(validator_index as u32),
)?
.map(|erasure_chunk| erasure_chunk.chunk.len())
} else {
None
};

let _ = tx.send(maybe_chunk_size);
},
AvailabilityStoreMessage::QueryAllChunks(candidate, tx) => {
match load_meta(&subsystem.db, &subsystem.config, &candidate)? {
None => {
Expand Down
48 changes: 48 additions & 0 deletions node/core/av-store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,3 +1153,51 @@ async fn import_leaf(

new_leaf
}

#[test]
fn query_chunk_size_works() {
let store = test_store();

test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move {
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
let validator_index = ValidatorIndex(5);
let n_validators = 10;

let chunk = ErasureChunk {
chunk: vec![1, 2, 3],
index: validator_index,
proof: Proof::try_from(vec![vec![3, 4, 5]]).unwrap(),
};

// Ensure an entry already exists. In reality this would come from watching
// chain events.
with_tx(&store, |tx| {
super::write_meta(
tx,
&TEST_CONFIG,
&candidate_hash,
&CandidateMeta {
data_available: false,
chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; n_validators],
state: State::Unavailable(BETimestamp(0)),
},
);
});

let (tx, rx) = oneshot::channel();

let chunk_msg =
AvailabilityStoreMessage::StoreChunk { candidate_hash, chunk: chunk.clone(), tx };

overseer_send(&mut virtual_overseer, chunk_msg).await;
assert_eq!(rx.await.unwrap(), Ok(()));

let (tx, rx) = oneshot::channel();
let query_chunk_size = AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx);

overseer_send(&mut virtual_overseer, query_chunk_size).await;

assert_eq!(rx.await.unwrap().unwrap(), chunk.chunk.len());
virtual_overseer
});
}
113 changes: 97 additions & 16 deletions node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,47 @@ const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
#[cfg(test)]
const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);

/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
/// PoV size limit in bytes for which prefer fetching from backers.
const SMALL_POV_LIMIT: usize = 128 * 1024;

#[derive(Clone, PartialEq)]
/// The strategy we use to recover the PoV.
pub enum RecoveryStrategy {
/// We always try the backing group first, then fallback to validator chunks.
BackersFirstAlways,
/// We try the backing group first if PoV size is lower than specified, then fallback to validator chunks.
BackersFirstIfSizeLower(usize),
/// We always recover using validator chunks.
ChunksAlways,
/// Do not request data from the availability store.
/// This is the useful for nodes where the
/// availability-store subsystem is not expected to run,
/// such as collators.
bypass_availability_store: bool,
BypassAvailabilityStore,
}

impl RecoveryStrategy {
/// Returns true if the strategy needs backing group index.
pub fn needs_backing_group(&self) -> bool {
match self {
RecoveryStrategy::BackersFirstAlways | RecoveryStrategy::BackersFirstIfSizeLower(_) =>
true,
_ => false,
}
}

fast_path: bool,
/// Returns the PoV size limit in bytes for `BackersFirstIfSizeLower` strategy, otherwise `None`.
pub fn pov_size_limit(&self) -> Option<usize> {
match *self {
RecoveryStrategy::BackersFirstIfSizeLower(limit) => Some(limit),
_ => None,
}
}
}
/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
/// PoV recovery strategy to use.
recovery_strategy: RecoveryStrategy,
/// Receiver for available data requests.
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
/// Metrics for this subsystem.
Expand Down Expand Up @@ -863,10 +895,10 @@ async fn launch_recovery_task<Context>(
ctx: &mut Context,
session_info: SessionInfo,
receipt: CandidateReceipt,
backing_group: Option<GroupIndex>,
mut backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
bypass_availability_store: bool,
metrics: &Metrics,
recovery_strategy: &RecoveryStrategy,
) -> error::Result<()> {
let candidate_hash = receipt.hash();

Expand All @@ -877,9 +909,33 @@ async fn launch_recovery_task<Context>(
candidate_hash,
erasure_root: receipt.descriptor.erasure_root,
metrics: metrics.clone(),
bypass_availability_store,
bypass_availability_store: recovery_strategy == &RecoveryStrategy::BypassAvailabilityStore,
};

if let Some(small_pov_limit) = recovery_strategy.pov_size_limit() {
// Get our own chunk size to get an estimate of the PoV size.
let chunk_size: Result<Option<usize>, error::Error> =
query_chunk_size(ctx, candidate_hash).await;
if let Ok(Some(chunk_size)) = chunk_size {
let pov_size_estimate = chunk_size.saturating_mul(session_info.validators.len()) / 3;
let prefer_backing_group = pov_size_estimate < small_pov_limit;

gum::trace!(
target: LOG_TARGET,
?candidate_hash,
pov_size_estimate,
small_pov_limit,
enabled = prefer_backing_group,
"Prefer fetch from backing group",
);

backing_group = backing_group.filter(|_| {
// We keep the backing group only if `1/3` of chunks sum up to less than `small_pov_limit`.
prefer_backing_group
});
}
}

let phase = backing_group
.and_then(|g| session_info.validator_groups.get(g))
.map(|group| Source::RequestFromBackers(RequestFromBackers::new(group.clone())))
Expand Down Expand Up @@ -917,8 +973,8 @@ async fn handle_recover<Context>(
session_index: SessionIndex,
backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
bypass_availability_store: bool,
metrics: &Metrics,
recovery_strategy: &RecoveryStrategy,
) -> error::Result<()> {
let candidate_hash = receipt.hash();

Expand Down Expand Up @@ -961,8 +1017,8 @@ async fn handle_recover<Context>(
receipt,
backing_group,
response_sender,
bypass_availability_store,
metrics,
recovery_strategy,
)
.await,
None => {
Expand All @@ -988,6 +1044,18 @@ async fn query_full_data<Context>(
rx.await.map_err(error::Error::CanceledQueryFullData)
}

/// Queries a chunk from av-store.
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
async fn query_chunk_size<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
) -> error::Result<Option<usize>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx))
.await;

rx.await.map_err(error::Error::CanceledQueryFullData)
}
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
impl AvailabilityRecoverySubsystem {
/// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the
Expand All @@ -996,7 +1064,7 @@ impl AvailabilityRecoverySubsystem {
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: false, bypass_availability_store: true, req_receiver, metrics }
Self { recovery_strategy: RecoveryStrategy::BypassAvailabilityStore, req_receiver, metrics }
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
Expand All @@ -1005,20 +1073,33 @@ impl AvailabilityRecoverySubsystem {
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: true, bypass_availability_store: false, req_receiver, metrics }
Self { recovery_strategy: RecoveryStrategy::BackersFirstAlways, req_receiver, metrics }
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
pub fn with_chunks_only(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: false, bypass_availability_store: false, req_receiver, metrics }
Self { recovery_strategy: RecoveryStrategy::ChunksAlways, req_receiver, metrics }
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which requests chunks if PoV is
/// above a threshold.
pub fn with_chunks_if_pov_large(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self {
recovery_strategy: RecoveryStrategy::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
req_receiver,
metrics,
}
}

async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
let mut state = State::default();
let Self { fast_path, mut req_receiver, metrics, bypass_availability_store } = self;
let Self { recovery_strategy, mut req_receiver, metrics } = self;

loop {
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
Expand All @@ -1045,10 +1126,10 @@ impl AvailabilityRecoverySubsystem {
&mut ctx,
receipt,
session_index,
maybe_backing_group.filter(|_| fast_path),
maybe_backing_group.filter(|_| recovery_strategy.needs_backing_group()),
response_sender,
bypass_availability_store,
&metrics,
&recovery_strategy,
).await {
gum::warn!(
target: LOG_TARGET,
Expand All @@ -1064,7 +1145,7 @@ impl AvailabilityRecoverySubsystem {
in_req = recv_req => {
match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? {
Ok(req) => {
if bypass_availability_store {
if recovery_strategy == RecoveryStrategy::BypassAvailabilityStore {
gum::debug!(
target: LOG_TARGET,
"Skipping request to availability-store.",
Expand Down
Loading

0 comments on commit f046636

Please sign in to comment.