Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

availability-distribution: look for leaf ancestors within the same session #4596

Merged
merged 5 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion node/network/availability-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use thiserror::Error;
use futures::channel::oneshot;

use polkadot_node_subsystem_util::runtime;
use polkadot_subsystem::SubsystemError;
use polkadot_subsystem::{ChainApiError, SubsystemError};

use crate::LOG_TARGET;

Expand Down Expand Up @@ -62,6 +62,12 @@ pub enum Fatal {
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information: {0}")]
Runtime(#[from] runtime::Fatal),

#[error("Oneshot for receiving response from Chain API got cancelled")]
ChainApiSenderDropped(#[source] oneshot::Canceled),

#[error("Retrieving response from Chain API unexpectedly failed with error: {0}")]
ChainApi(#[from] ChainApiError),
}

/// Non-fatal errors of this subsystem.
Expand Down
131 changes: 115 additions & 16 deletions node/network/availability-distribution/src/requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ use std::{
};

use futures::{
channel::mpsc,
channel::{mpsc, oneshot},
task::{Context, Poll},
Stream,
};

use polkadot_node_subsystem_util::runtime::{get_occupied_cores, RuntimeInfo};
use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore};
use polkadot_subsystem::{
messages::AllMessages, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext,
messages::{AllMessages, ChainApiMessage},
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext,
};

use super::{Metrics, LOG_TARGET};
use super::{Metrics, Result, LOG_TARGET};
use crate::error::Fatal;

/// Cache for session information.
mod session_cache;
Expand Down Expand Up @@ -75,6 +77,9 @@ pub struct Requester {
}

impl Requester {
/// How many ancestors of the leaf should we consider along with it.
pub(crate) const LEAF_ANCESTRY_LEN_WITHIN_SESSION: usize = 3;

/// Create a new `Requester`.
///
/// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress
Expand All @@ -83,6 +88,7 @@ impl Requester {
let (tx, rx) = mpsc::channel(1);
Requester { fetches: HashMap::new(), session_cache: SessionCache::new(), tx, rx, metrics }
}

/// Update heads that need availability distribution.
///
/// For all active heads we will be fetching our chunks for availability distribution.
Expand All @@ -91,43 +97,72 @@ impl Requester {
ctx: &mut Context,
runtime: &mut RuntimeInfo,
update: ActiveLeavesUpdate,
) -> super::Result<()>
) -> Result<()>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

where
Context: SubsystemContext,
{
tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
let ActiveLeavesUpdate { activated, deactivated } = update;
// Stale leaves happen after a reversion - we don't want to re-run availability there.
let activated = activated.and_then(|h| match h.status {
LeafStatus::Stale => None,
LeafStatus::Fresh => Some(h),
});
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
self.start_requesting_chunks(ctx, runtime, activated.into_iter()).await?;
if let Some(activated) = activated {
// Stale leaves happen after a reversion - we don't want to re-run availability there.
if let LeafStatus::Fresh = activated.status {
Comment on lines +111 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there really no better way to do this check??

Maybe this is more elegant than two if let's or the ugly thing that was there before:

match activated {
    Some(act) if act.status == LeafStatus::Fresh => { ... },
    _ => {},
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also express it with

if matches!(activated, Some(activated) if activated.status == LeafStatus::Fresh) {

(provided one derives the PartialEq for status)
But to be honest nested if let is the most readable option to me, 2 more lines is not a big deal

self.start_requesting_chunks(ctx, runtime, activated).await?;
}
}
self.stop_requesting_chunks(deactivated.into_iter());
Ok(())
}

/// Start requesting chunks for newly imported heads.
/// Start requesting chunks for newly imported head.
///
/// This will also request [`SESSION_ANCESTRY_LEN`] leaf ancestors from the same session
/// and start requesting chunks for them too.
async fn start_requesting_chunks<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
new_heads: impl Iterator<Item = ActivatedLeaf>,
) -> super::Result<()>
new_head: ActivatedLeaf,
) -> Result<()>
where
Context: SubsystemContext,
{
for ActivatedLeaf { hash: leaf, .. } in new_heads {
let cores = get_occupied_cores(ctx, leaf).await?;
let ActivatedLeaf { hash: leaf, .. } = new_head;
let ancestors_in_session = get_block_ancestors_in_same_session(
ctx,
runtime,
leaf,
Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION,
)
.await
.unwrap_or_else(|err| {
tracing::debug!(
target: LOG_TARGET,
leaf = ?leaf,
"Failed to fetch leaf ancestors in the same session due to an error: {}",
err
);
Vec::new()
});
// Also spawn or bump tasks for candidates in ancestry in the same session.
for hash in std::iter::once(leaf).chain(ancestors_in_session) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

let cores = get_occupied_cores(ctx, hash).await?;
tracing::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
"Query occupied core"
);
// Important:
// We mark the whole ancestry as live in the **leaf** hash, so we don't need to track
Copy link
Contributor

@rphmeier rphmeier Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is great and seems to be correct behavior. I had noticed that add_cores might race with remove_leaf if you used hash.

It would be even more clear if you wrote "We invoke add_cores with the leaf and not with the ancestor hash to avoid a race condition and avoid requesting chunks multiple times" or something like that.

// any tasks separately.
//
// The next time the subsystem receives leaf update, some of spawned task will be bumped
// to be live in fresh relay parent, while some might get dropped due to the current leaf
// being deactivated.
self.add_cores(ctx, runtime, leaf, cores).await?;
}

Ok(())
}

Expand All @@ -154,7 +189,7 @@ impl Requester {
runtime: &mut RuntimeInfo,
leaf: Hash,
cores: impl IntoIterator<Item = OccupiedCore>,
) -> super::Result<()>
) -> Result<()>
where
Context: SubsystemContext,
{
Expand Down Expand Up @@ -215,3 +250,67 @@ impl Stream for Requester {
}
}
}

/// Requests up to `limit` ancestor hashes of relay parent in the same session.
async fn get_block_ancestors_in_same_session<Context>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic here seems accurate accounting for the fact that get_session_index is actually returning the session index of the child. Nice

ctx: &mut Context,
runtime: &mut RuntimeInfo,
head: Hash,
limit: usize,
) -> Result<Vec<Hash>>
where
Context: SubsystemContext,
{
// The order is parent, grandparent, ...
//
// `limit + 1` since a session index for the last element in ancestry
// is obtained through its parent. It always gets truncated because
// `session_ancestry_len` can only be incremented `ancestors.len() - 1` times.
let mut ancestors = get_block_ancestors(ctx, head, limit + 1).await?;

// `head` is the child of the first block in `ancestors`, request its session index.
let head_session_index = match ancestors.first() {
Some(parent) => runtime.get_session_index(ctx.sender(), *parent).await?,
None => {
// No first element, i.e. empty.
return Ok(ancestors)
},
};

let mut session_ancestry_len = 0;
for parent in ancestors.iter().skip(1) {
slumber marked this conversation as resolved.
Show resolved Hide resolved
// Parent is the i-th ancestor, request session index for its child -- (i-1)th element.
let session_index = runtime.get_session_index(ctx.sender(), *parent).await?;
if session_index == head_session_index {
session_ancestry_len += 1;
} else {
break
}
}

// Drop the rest.
ancestors.truncate(session_ancestry_len);

Ok(ancestors)
}

/// Request up to `limit` ancestor hashes of relay parent from the Chain API.
async fn get_block_ancestors<Context>(
ctx: &mut Context,
relay_parent: Hash,
limit: usize,
) -> Result<Vec<Hash>>
where
Context: SubsystemContext,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::Ancestors {
hash: relay_parent,
k: limit,
response_channel: tx,
})
.await;

let ancestors = rx.await.map_err(Fatal::ChainApiSenderDropped)?.map_err(Fatal::ChainApi)?;
Ok(ancestors)
}