Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node)!: Refactor BlockRanges and optimize data sampling queue population #320

Merged
merged 47 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
36a74e1
wip
oblique Jul 2, 2024
4151687
wip2
oblique Jul 2, 2024
98047d7
wip
oblique Jul 3, 2024
a6f3ce4
wip
oblique Jul 3, 2024
752a788
wip
oblique Jul 3, 2024
1acef6f
wip
oblique Jul 3, 2024
a769851
wip
oblique Jul 3, 2024
2f6a463
wip
oblique Jul 3, 2024
5525687
wip
oblique Jul 3, 2024
ebc3c9b
wip
oblique Jul 3, 2024
322e801
wip
oblique Jul 3, 2024
20a3bac
wip
oblique Jul 3, 2024
b757d5d
migrate redb to new ranges schema
oblique Jul 4, 2024
3798910
cleanup
oblique Jul 4, 2024
22f6cfe
handle pruned headers
oblique Jul 4, 2024
d49b91a
migrate indexeddb to new ranges schema
oblique Jul 4, 2024
d663ba6
cleanup
oblique Jul 4, 2024
7817a1c
cleanup
oblique Jul 4, 2024
b866fdf
start migrating to new algorithm
oblique Jul 4, 2024
b9f7567
wip
oblique Jul 5, 2024
60b0c08
migrate insertion contrains to the new algorithm
oblique Jul 5, 2024
16d99fc
fix clippy
oblique Jul 5, 2024
5b9898a
remove inverted_up_to_head
oblique Jul 5, 2024
dd2924e
rename header_ranges.rs to block_ranges.rs
oblique Jul 5, 2024
adada2a
Merge remote-tracking branch 'origin/main' into fix/daser-queue-bug
oblique Jul 5, 2024
940d202
fix docsrs in ci
oblique Jul 5, 2024
580b5a2
minor
oblique Jul 5, 2024
292dbd8
chore: Fix docs CI
oblique Jul 5, 2024
79e6bc6
fix doc comment
oblique Jul 5, 2024
e44f558
Merge remote-tracking branch 'oblique/chore/docs-ci' into fix/daser-q…
oblique Jul 5, 2024
72b51eb
Merge branch 'main' into fix/daser-queue-bug
oblique Jul 5, 2024
2a47159
Merge remote-tracking branch 'origin/main' into fix/daser-queue-bug
oblique Jul 5, 2024
cfcf4de
consistent naming
oblique Jul 8, 2024
90ec26c
better error messages
oblique Jul 8, 2024
b539de7
move block ranges in root of lumina-node
oblique Jul 8, 2024
164e231
Update node/src/store.rs
oblique Jul 8, 2024
5f75b9e
fmt
oblique Jul 8, 2024
cd6f3d0
apply review suggestions
oblique Jul 8, 2024
892a047
doc
oblique Jul 8, 2024
81e9e4c
fix
oblique Jul 8, 2024
19a228b
Update node/src/block_ranges.rs
oblique Jul 9, 2024
b05364c
apply review suggestions
oblique Jul 9, 2024
d70006e
version specific modules
oblique Jul 9, 2024
386999d
add doc
oblique Jul 9, 2024
1836e3f
doc
oblique Jul 9, 2024
e0686a7
constrains -> constraints
oblique Jul 9, 2024
b095aae
Merge branch 'main' into fix/daser-queue-bug
oblique Jul 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:

- name: Run rustdoc check
env:
RUSTDOCFLAGS: --cfg docs_rs -D warnings
RUSTDOCFLAGS: --cfg docsrs -D warnings
run: cargo +nightly doc


Expand Down
164 changes: 63 additions & 101 deletions node/src/daser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@
//! 5. Steps 3 and 4 are repeated concurently, unless we detect that all peers have disconnected.
//! At that point Daser cleans the queue and moves back to step 1.

use std::collections::{HashSet, VecDeque};
use std::collections::HashSet;
use std::sync::Arc;

use celestia_tendermint::Time;
use celestia_types::ExtendedHeader;
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
Expand All @@ -47,7 +46,7 @@ use crate::events::{EventPublisher, NodeEvent};
use crate::executor::spawn;
use crate::p2p::shwap::sample_cid;
use crate::p2p::{P2p, P2pError};
use crate::store::{HeaderRanges, SamplingStatus, Store, StoreError};
use crate::store::{BlockRanges, SamplingStatus, Store, StoreError};

const MAX_SAMPLES_NEEDED: usize = 16;

Expand Down Expand Up @@ -134,18 +133,12 @@ where
store: Arc<S>,
max_samples_needed: usize,
sampling_futs: FuturesUnordered<BoxFuture<'static, Result<(u64, bool)>>>,
queue: VecDeque<SamplingArgs>,
prev_stored_blocks: HeaderRanges,
queue: BlockRanges,
done: BlockRanges,
ongoing: BlockRanges,
prev_head: Option<u64>,
}

#[derive(Debug)]
struct SamplingArgs {
height: u64,
square_width: u16,
time: Time,
}

impl<S> Worker<S>
where
S: Store,
Expand All @@ -158,8 +151,9 @@ where
store: args.store,
max_samples_needed: MAX_SAMPLES_NEEDED,
sampling_futs: FuturesUnordered::new(),
queue: VecDeque::new(),
prev_stored_blocks: HeaderRanges::default(),
queue: BlockRanges::default(),
done: BlockRanges::default(),
ongoing: BlockRanges::default(),
prev_head: None,
})
}
Expand Down Expand Up @@ -229,10 +223,10 @@ where

loop {
// If we have a new HEAD queued, schedule it now!
if let Some(queue_front) = self.queue.front().map(|args| args.height) {
if queue_front > self.prev_head.unwrap_or(0) {
if let Some(queue_head) = self.queue.head() {
if queue_head > self.prev_head.unwrap_or(0) {
self.schedule_next_sample_block().await?;
self.prev_head = Some(queue_front);
self.prev_head = Some(queue_head);
}
}

Expand Down Expand Up @@ -265,6 +259,9 @@ where
self.store
.update_sampling_metadata(height, status, Vec::new())
.await?;

self.ongoing.remove_relaxed(height..=height).expect("invalid height");
self.done.insert_relaxed(height..=height).expect("invalid height");
},
_ = &mut wait_new_head => {
wait_new_head = store.wait_new_head();
Expand All @@ -274,38 +271,63 @@ where
}

self.sampling_futs.clear();
self.queue.clear();
self.prev_stored_blocks = HeaderRanges::default();
self.queue = BlockRanges::default();
self.ongoing = BlockRanges::default();
self.done = BlockRanges::default();
self.prev_head = None;

Ok(())
}

async fn schedule_next_sample_block(&mut self) -> Result<()> {
let Some(args) = self.queue.pop_front() else {
return Ok(());
// Schedule the most recent un-sampled block.
let header = loop {
let Some(height) = self.queue.pop_head() else {
return Ok(());
};

match self.store.get_by_height(height).await {
Ok(header) => break header,
Err(StoreError::NotFound) => {
// Height was pruned and our queue is inconsistent.
// Repopulate queue and try again.
self.populate_queue().await?;
}
Err(e) => return Err(e.into()),
}
};

let height = header.height().value();
let square_width = header.dah.square_width();

// Make sure that the block is still in the sampling window.
if !in_sampling_window(args.time) {
// Queue is sorted by block height in descending order,
// so as soon as we reach a block that is not in the sampling
if !in_sampling_window(header.time()) {
// As soon as we reach a block that is not in the sampling
// window, it means the rest wouldn't be either.
self.queue.clear();
self.queue
.remove_relaxed(1..=height)
.expect("invalid height");
self.done
.insert_relaxed(1..=height)
.expect("invalid height");
return Ok(());
}

// Select random shares to be sampled
let share_indexes = random_indexes(args.square_width, self.max_samples_needed);
let share_indexes = random_indexes(square_width, self.max_samples_needed);

// Update the CID list before we start sampling, otherwise it's possible for us
// to leak CIDs causing associated blocks to never get cleaned from blockstore.
let cids = share_indexes
.iter()
.map(|(row, col)| sample_cid(*row, *col, args.height))
.map(|(row, col)| sample_cid(*row, *col, height))
.collect::<Result<Vec<_>, _>>()?;

// NOTE: Pruning window is always 1 hour bigger than sampling
// window, so after this `in_sampling_window` if statement we
// shouldn't care about `StoreError::NotFound` anymore.
self.store
.update_sampling_metadata(args.height, SamplingStatus::Unknown, cids)
.update_sampling_metadata(height, SamplingStatus::Unknown, cids)
.await?;

let p2p = self.p2p.clone();
Expand All @@ -316,8 +338,8 @@ where
let now = Instant::now();

event_pub.send(NodeEvent::SamplingStarted {
height: args.height,
square_width: args.square_width,
height,
square_width,
shares: share_indexes.iter().copied().collect(),
});

Expand All @@ -328,7 +350,7 @@ where
let p2p = p2p.clone();

async move {
let res = p2p.get_sample(row, col, args.height).await;
let res = p2p.get_sample(row, col, height).await;
(row, col, res)
}
})
Expand All @@ -351,25 +373,28 @@ where
block_accepted &= share_accepted;

event_pub.send(NodeEvent::ShareSamplingResult {
height: args.height,
square_width: args.square_width,
height,
square_width,
row,
column,
accepted: share_accepted,
});
}

event_pub.send(NodeEvent::SamplingFinished {
height: args.height,
height,
accepted: block_accepted,
took: now.elapsed(),
});

Ok((args.height, block_accepted))
Ok((height, block_accepted))
}
.boxed();

self.sampling_futs.push(fut);
self.ongoing
.insert_relaxed(height..=height)
.expect("invalid height");

Ok(())
}
Expand All @@ -381,70 +406,15 @@ where
/// limitation that's coming from bitswap: only way for us to know if sampling
/// failed is via timeout.
async fn populate_queue(&mut self) -> Result<()> {
let stored_blocks = self.store.get_stored_header_ranges().await?;
let first_check = self.prev_stored_blocks.is_empty();

'outer: for block_range in stored_blocks.clone().into_inner().into_iter().rev() {
for height in block_range.rev() {
if self.prev_stored_blocks.contains(height) {
// Skip blocks that were checked before
continue;
}

// Optimization: We check if the block was accepted only if this is
// the first time we check the store (i.e. prev_stored_blocks is empty),
// otherwise we can safely assume that block needs sampling.
if first_check && is_block_accepted(&*self.store, height).await {
// Skip already sampled blocks
continue;
}

let Ok(header) = self.store.get_by_height(height).await else {
// We reached the tail of the known heights
break 'outer;
};
let stored = self.store.get_stored_header_ranges().await?;
let accepted = self.store.get_accepted_sampling_ranges().await?;

if !in_sampling_window(header.time()) {
// We've reached the tail of the sampling window
break 'outer;
}

queue_sampling(&mut self.queue, &header);
}
}

self.prev_stored_blocks = stored_blocks;
self.queue = stored - accepted - &self.done - &self.ongoing;

Ok(())
}
}

/// Queue sampling in descending order
fn queue_sampling(queue: &mut VecDeque<SamplingArgs>, header: &ExtendedHeader) {
let args = SamplingArgs {
height: header.height().value(),
time: header.time(),
square_width: header.dah.square_width(),
};

if queue.is_empty() || args.height >= queue.front().unwrap().height {
queue.push_front(args);
return;
}

if args.height <= queue.back().unwrap().height {
queue.push_back(args);
return;
}

let pos = match queue.binary_search_by(|x| args.height.cmp(&x.height)) {
Ok(pos) => pos,
Err(pos) => pos,
};

queue.insert(pos, args);
}

/// Returns true if `time` is within the sampling window.
fn in_sampling_window(time: Time) -> bool {
let now = Time::now();
Expand All @@ -461,14 +431,6 @@ fn in_sampling_window(time: Time) -> bool {
age <= SAMPLING_WINDOW
}

/// Returns true if block has been sampled and accepted.
async fn is_block_accepted(store: &impl Store, height: u64) -> bool {
match store.get_sampling_metadata(height).await {
Ok(Some(metadata)) => metadata.status == SamplingStatus::Accepted,
_ => false,
}
}

/// Returns unique and random indexes that will be used for sampling.
fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> {
let samples_in_block = usize::from(square_width).pow(2);
Expand Down
4 changes: 2 additions & 2 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use crate::p2p::shwap::{namespaced_data_cid, row_cid, sample_cid, ShwapMultihash
use crate::p2p::swarm::new_swarm;
use crate::peer_tracker::PeerTracker;
use crate::peer_tracker::PeerTrackerInfo;
use crate::store::header_ranges::HeaderRange;
use crate::store::block_ranges::BlockRange;
use crate::store::Store;
use crate::utils::{
celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt,
Expand Down Expand Up @@ -425,7 +425,7 @@ impl P2p {
/// responsibility to verify range edges against headers existing in the store.
pub(crate) async fn get_unverified_header_range(
&self,
range: HeaderRange,
range: BlockRange,
) -> Result<Vec<ExtendedHeader>> {
if range.is_empty() {
return Err(HeaderExError::InvalidRequest.into());
Expand Down
8 changes: 4 additions & 4 deletions node/src/p2p/header_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use tracing::debug;
use crate::executor::spawn;
use crate::p2p::header_ex::utils::HeaderRequestExt;
use crate::p2p::{P2pCmd, P2pError};
use crate::store::header_ranges::{HeaderRange, RangeLengthExt};
use crate::store::block_ranges::{BlockRange, BlockRangeExt};

const MAX_AMOUNT_PER_REQ: u64 = 64;
const MAX_CONCURRENT_REQS: usize = 8;

type Result<T, E = P2pError> = std::result::Result<T, E>;

pub(crate) struct HeaderSession {
to_fetch: Option<HeaderRange>,
to_fetch: Option<BlockRange>,
cmd_tx: mpsc::Sender<P2pCmd>,
response_tx: mpsc::Sender<(u64, u64, Result<Vec<ExtendedHeader>>)>,
response_rx: mpsc::Receiver<(u64, u64, Result<Vec<ExtendedHeader>>)>,
Expand All @@ -30,7 +30,7 @@ impl HeaderSession {
///
/// [`calculate_fetch_range`] crate::store::utils::calculate_fetch_range
/// [`Store::get_stored_header_ranges`]: crate::store::Store::get_stored_header_ranges
pub(crate) fn new(range: HeaderRange, cmd_tx: mpsc::Sender<P2pCmd>) -> Self {
pub(crate) fn new(range: BlockRange, cmd_tx: mpsc::Sender<P2pCmd>) -> Self {
let (response_tx, response_rx) = mpsc::channel(MAX_CONCURRENT_REQS);

HeaderSession {
Expand Down Expand Up @@ -140,7 +140,7 @@ impl HeaderSession {
}

/// take a next batch of up to `limit` headers from the front of the `range_to_fetch`
fn take_next_batch(range_to_fetch: &mut Option<HeaderRange>, limit: u64) -> Option<HeaderRange> {
fn take_next_batch(range_to_fetch: &mut Option<BlockRange>, limit: u64) -> Option<BlockRange> {
// calculate potential end offset before we modify range_to_fetch
let end_offset = limit.checked_sub(1)?;

Expand Down
Loading
Loading