Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
availability-distribution: look for leaf ancestors within the same se…
Browse files Browse the repository at this point in the history
…ssion (#4596)

* availability-distribution: look for leaf ancestors

* Re-use subsystem-util

* Rework ancestry tasks scheduling

* Requester tests

* Improve readability for ancestors lookup
  • Loading branch information
slumber authored Jan 26, 2022
1 parent 728070d commit 02508b4
Show file tree
Hide file tree
Showing 5 changed files with 465 additions and 19 deletions.
8 changes: 7 additions & 1 deletion node/network/availability-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use thiserror::Error;
use futures::channel::oneshot;

use polkadot_node_subsystem_util::runtime;
use polkadot_subsystem::SubsystemError;
use polkadot_subsystem::{ChainApiError, SubsystemError};

use crate::LOG_TARGET;

Expand Down Expand Up @@ -63,6 +63,12 @@ pub enum Fatal {
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information: {0}")]
Runtime(#[from] runtime::Fatal),

#[error("Oneshot for receiving response from Chain API got cancelled")]
ChainApiSenderDropped(#[source] oneshot::Canceled),

#[error("Retrieving response from Chain API unexpectedly failed with error: {0}")]
ChainApi(#[from] ChainApiError),
}

/// Non-fatal errors of this subsystem.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub struct FetchTask {
/// In other words, for which relay chain parents this candidate is considered live.
/// This is updated on every `ActiveLeavesUpdate` and enables us to know when we can safely
/// stop keeping track of that candidate/chunk.
live_in: HashSet<Hash>,
pub(crate) live_in: HashSet<Hash>,

/// We keep the task around in until `live_in` becomes empty, to make
/// sure we won't re-fetch an already fetched candidate.
Expand Down
136 changes: 120 additions & 16 deletions node/network/availability-distribution/src/requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,23 @@ use std::{
};

use futures::{
channel::mpsc,
channel::{mpsc, oneshot},
task::{Context, Poll},
Stream,
};

use polkadot_node_subsystem_util::runtime::{get_occupied_cores, RuntimeInfo};
use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore};
use polkadot_subsystem::{
messages::AllMessages, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext,
messages::{AllMessages, ChainApiMessage},
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext,
};

use super::{Metrics, LOG_TARGET};
use super::{Metrics, Result, LOG_TARGET};
use crate::error::Fatal;

#[cfg(test)]
mod tests;

/// Cache for session information.
mod session_cache;
Expand Down Expand Up @@ -75,6 +80,9 @@ pub struct Requester {
}

impl Requester {
/// How many ancestors of the leaf should we consider along with it.
pub(crate) const LEAF_ANCESTRY_LEN_WITHIN_SESSION: usize = 3;

/// Create a new `Requester`.
///
/// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress
Expand All @@ -83,6 +91,7 @@ impl Requester {
let (tx, rx) = mpsc::channel(1);
Requester { fetches: HashMap::new(), session_cache: SessionCache::new(), tx, rx, metrics }
}

/// Update heads that need availability distribution.
///
/// For all active heads we will be fetching our chunks for availability distribution.
Expand All @@ -91,43 +100,72 @@ impl Requester {
ctx: &mut Context,
runtime: &mut RuntimeInfo,
update: ActiveLeavesUpdate,
) -> super::Result<()>
) -> Result<()>
where
Context: SubsystemContext,
{
tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
let ActiveLeavesUpdate { activated, deactivated } = update;
// Stale leaves happen after a reversion - we don't want to re-run availability there.
let activated = activated.and_then(|h| match h.status {
LeafStatus::Stale => None,
LeafStatus::Fresh => Some(h),
});
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
self.start_requesting_chunks(ctx, runtime, activated.into_iter()).await?;
if let Some(activated) = activated {
// Stale leaves happen after a reversion - we don't want to re-run availability there.
if let LeafStatus::Fresh = activated.status {
self.start_requesting_chunks(ctx, runtime, activated).await?;
}
}
self.stop_requesting_chunks(deactivated.into_iter());
Ok(())
}

/// Start requesting chunks for newly imported heads.
/// Start requesting chunks for newly imported head.
///
/// This will also request [`SESSION_ANCESTRY_LEN`] leaf ancestors from the same session
/// and start requesting chunks for them too.
async fn start_requesting_chunks<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
new_heads: impl Iterator<Item = ActivatedLeaf>,
) -> super::Result<()>
new_head: ActivatedLeaf,
) -> Result<()>
where
Context: SubsystemContext,
{
for ActivatedLeaf { hash: leaf, .. } in new_heads {
let cores = get_occupied_cores(ctx, leaf).await?;
let ActivatedLeaf { hash: leaf, .. } = new_head;
let ancestors_in_session = get_block_ancestors_in_same_session(
ctx,
runtime,
leaf,
Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION,
)
.await
.unwrap_or_else(|err| {
tracing::debug!(
target: LOG_TARGET,
leaf = ?leaf,
"Failed to fetch leaf ancestors in the same session due to an error: {}",
err
);
Vec::new()
});
// Also spawn or bump tasks for candidates in ancestry in the same session.
for hash in std::iter::once(leaf).chain(ancestors_in_session) {
let cores = get_occupied_cores(ctx, hash).await?;
tracing::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
"Query occupied core"
);
// Important:
// We mark the whole ancestry as live in the **leaf** hash, so we don't need to track
// any tasks separately.
//
// The next time the subsystem receives leaf update, some of spawned task will be bumped
// to be live in fresh relay parent, while some might get dropped due to the current leaf
// being deactivated.
self.add_cores(ctx, runtime, leaf, cores).await?;
}

Ok(())
}

Expand All @@ -154,7 +192,7 @@ impl Requester {
runtime: &mut RuntimeInfo,
leaf: Hash,
cores: impl IntoIterator<Item = OccupiedCore>,
) -> super::Result<()>
) -> Result<()>
where
Context: SubsystemContext,
{
Expand Down Expand Up @@ -215,3 +253,69 @@ impl Stream for Requester {
}
}
}

/// Requests up to `limit` ancestor hashes of relay parent in the same session.
async fn get_block_ancestors_in_same_session<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
head: Hash,
limit: usize,
) -> Result<Vec<Hash>>
where
Context: SubsystemContext,
{
// The order is parent, grandparent, ...
//
// `limit + 1` since a session index for the last element in ancestry
// is obtained through its parent. It always gets truncated because
// `session_ancestry_len` can only be incremented `ancestors.len() - 1` times.
let mut ancestors = get_block_ancestors(ctx, head, limit + 1).await?;
let mut ancestors_iter = ancestors.iter();

// `head` is the child of the first block in `ancestors`, request its session index.
let head_session_index = match ancestors_iter.next() {
Some(parent) => runtime.get_session_index(ctx.sender(), *parent).await?,
None => {
// No first element, i.e. empty.
return Ok(ancestors)
},
};

let mut session_ancestry_len = 0;
// The first parent is skipped.
for parent in ancestors_iter {
// Parent is the i-th ancestor, request session index for its child -- (i-1)th element.
let session_index = runtime.get_session_index(ctx.sender(), *parent).await?;
if session_index == head_session_index {
session_ancestry_len += 1;
} else {
break
}
}

// Drop the rest.
ancestors.truncate(session_ancestry_len);

Ok(ancestors)
}

/// Request up to `limit` ancestor hashes of relay parent from the Chain API.
async fn get_block_ancestors<Context>(
ctx: &mut Context,
relay_parent: Hash,
limit: usize,
) -> Result<Vec<Hash>>
where
Context: SubsystemContext,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::Ancestors {
hash: relay_parent,
k: limit,
response_channel: tx,
})
.await;

let ancestors = rx.await.map_err(Fatal::ChainApiSenderDropped)?.map_err(Fatal::ChainApi)?;
Ok(ancestors)
}
Loading

0 comments on commit 02508b4

Please sign in to comment.