Skip to content

Commit

Permalink
feat(node): Add syncing window for header sync (#309)
Browse files Browse the repository at this point in the history
Signed-off-by: Mikołaj Florkiewicz <mikolaj@florkiewicz.me>
Co-authored-by: Maciej Zwoliński <mac.zwolinski@gmail.com>
  • Loading branch information
fl0rek and zvolin authored Jul 1, 2024
1 parent 74123f3 commit c842ed3
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 53 deletions.
50 changes: 41 additions & 9 deletions node/src/p2p/header_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::p2p::{P2pCmd, P2pError};
use crate::store::header_ranges::{HeaderRange, RangeLengthExt};

const MAX_AMOUNT_PER_REQ: u64 = 64;
const MAX_CONCURRENT_REQS: usize = 1;
const MAX_CONCURRENT_REQS: usize = 8;

type Result<T, E = P2pError> = std::result::Result<T, E>;

Expand Down Expand Up @@ -139,16 +139,18 @@ impl HeaderSession {
}
}

/// take a next batch of up to `limit` headers from the front of the `range_to_fetch`
fn take_next_batch(range_to_fetch: &mut Option<HeaderRange>, limit: u64) -> Option<HeaderRange> {
// calculate potential end before we modify range_to_fetch
// calculate potential end offset before we modify range_to_fetch
let end_offset = limit.checked_sub(1)?;

let to_fetch = range_to_fetch.take()?;
if to_fetch.len() <= limit {
Some(to_fetch)
} else {
let _ = range_to_fetch.insert(*to_fetch.start() + limit..=*to_fetch.end());
Some(*to_fetch.start()..=*to_fetch.start() + end_offset)
// to_fetch.len() > limit, we shouldn't underflow here
let _ = range_to_fetch.insert(*to_fetch.start()..=*to_fetch.end() - limit);
Some(*to_fetch.end() - end_offset..=*to_fetch.end())
}
}

Expand Down Expand Up @@ -201,18 +203,18 @@ mod tests {
result_tx.send(res).unwrap();
});

for i in 0..8 {
for i in (0..8).rev() {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1 + 64 * i);
assert_eq!(height, 9 + 64 * i);
assert_eq!(amount, 64);
let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
}

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 513);
assert_eq!(height, 1);
assert_eq!(amount, 8);
let start = (height - 1) as usize;
let end = start + amount as usize;
Expand Down Expand Up @@ -299,8 +301,38 @@ mod tests {
fn take_next_batch_truncated_batch() {
let mut range_to_fetch = Some(1..=10);
let batch = take_next_batch(&mut range_to_fetch, 5);
assert_eq!(batch, Some(1..=5));
assert_eq!(range_to_fetch, Some(6..=10));
assert_eq!(batch, Some(6..=10));
assert_eq!(range_to_fetch, Some(1..=5));
}

#[test]
fn take_next_batch_truncated_calc() {
let mut range_to_fetch = Some(1..=512);

let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(449..=512));
assert_eq!(range_to_fetch, Some(1..=448));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(385..=448));
assert_eq!(range_to_fetch, Some(1..=384));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(321..=384));
assert_eq!(range_to_fetch, Some(1..=320));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(257..=320));
assert_eq!(range_to_fetch, Some(1..=256));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(193..=256));
assert_eq!(range_to_fetch, Some(1..=192));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(129..=192));
assert_eq!(range_to_fetch, Some(1..=128));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(65..=128));
assert_eq!(range_to_fetch, Some(1..=64));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(1..=64));
assert_eq!(range_to_fetch, None);
}

#[test]
Expand Down
99 changes: 68 additions & 31 deletions node/src/store/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::ops::RangeInclusive;
use celestia_types::ExtendedHeader;

use crate::executor::yield_now;
use crate::store::header_ranges::HeaderRange;
use crate::store::header_ranges::{HeaderRange, RangeLengthExt};
use crate::store::{Result, StoreError};

pub(crate) const VALIDATIONS_PER_YIELD: usize = 4;
Expand All @@ -13,34 +13,48 @@ pub(crate) const VALIDATIONS_PER_YIELD: usize = 4;
pub(crate) fn calculate_range_to_fetch(
head_height: u64,
store_headers: &[RangeInclusive<u64>],
syncing_window_edge: Option<u64>,
limit: u64,
) -> HeaderRange {
let mut missing_range = get_most_recent_missing_range(head_height, store_headers);

// truncate to syncing window, if height is known
if let Some(window_edge) = syncing_window_edge {
if missing_range.start() < &window_edge {
missing_range = window_edge + 1..=*missing_range.end();
}
}

// truncate number of headers to limit
if missing_range.len() > limit {
let end = missing_range.end();
let start = end.saturating_sub(limit) + 1;
missing_range = start..=*end;
}

missing_range
}

fn get_most_recent_missing_range(
head_height: u64,
store_headers: &[RangeInclusive<u64>],
) -> HeaderRange {
let mut store_headers_iter = store_headers.iter().rev();

let Some(store_head_range) = store_headers_iter.next() else {
// empty store, just fetch from head
return head_height.saturating_sub(limit) + 1..=head_height;
// empty store, we're missing everything
return 1..=head_height;
};

if store_head_range.end() != &head_height {
if store_head_range.end() < &head_height {
// if we haven't caught up with network head, start from there
let fetch_start = u64::max(
store_head_range.end() + 1,
head_height.saturating_sub(limit) + 1,
);
return fetch_start..=head_height;
return store_head_range.end() + 1..=head_height;
}

// there exists a range contiguous with network head. inspect previous range end
let penultimate_range_end = store_headers_iter.next().map(|r| *r.end()).unwrap_or(0);

let fetch_end = store_head_range.start().saturating_sub(1);
let fetch_start = u64::max(
penultimate_range_end + 1,
fetch_end.saturating_sub(limit) + 1,
);

fetch_start..=fetch_end
penultimate_range_end + 1..=store_head_range.start().saturating_sub(1)
}

pub(crate) fn try_consolidate_ranges(
Expand Down Expand Up @@ -129,60 +143,83 @@ mod tests {
let head_height = 1024;
let ranges = [256..=512];

let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 16);
assert_eq!(fetch_range, 1009..=1024);

let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 511);
assert_eq!(fetch_range, 514..=1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 512);
assert_eq!(fetch_range, 513..=1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 513);
assert_eq!(fetch_range, 513..=1024);

let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 1024);
assert_eq!(fetch_range, 513..=1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, Some(900), 1024);
assert_eq!(fetch_range, 901..=1024);
}

#[test]
fn calculate_range_to_fetch_empty_store() {
let fetch_range = calculate_range_to_fetch(1, &[], 100);
let fetch_range = calculate_range_to_fetch(1, &[], None, 100);
assert_eq!(fetch_range, 1..=1);

let fetch_range = calculate_range_to_fetch(100, &[], 10);
let fetch_range = calculate_range_to_fetch(100, &[], None, 10);
assert_eq!(fetch_range, 91..=100);

let fetch_range = calculate_range_to_fetch(100, &[], Some(75), 50);
assert_eq!(fetch_range, 76..=100);
}

#[test]
fn calculate_range_to_fetch_fully_synced() {
let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
let fetch_range = calculate_range_to_fetch(1, &[1..=1], None, 100);
assert!(fetch_range.is_empty());

let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
let fetch_range = calculate_range_to_fetch(100, &[1..=100], None, 10);
assert!(fetch_range.is_empty());

let fetch_range = calculate_range_to_fetch(100, &[1..=100], Some(100), 10);
assert!(fetch_range.is_empty());
}

#[test]
fn calculate_range_to_fetch_caught_up() {
let head_height = 4000;

let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], None, 500);
assert_eq!(fetch_range, 2500..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], Some(2600), 500);
assert_eq!(fetch_range, 2601..=2999);
let fetch_range =
calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], None, 500);
assert_eq!(fetch_range, 2500..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
let fetch_range =
calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], None, 500);
assert_eq!(fetch_range, 2801..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
let fetch_range =
calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], Some(2000), 500);
assert_eq!(fetch_range, 2801..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], None, 500);
assert_eq!(fetch_range, 1..=299);
let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], Some(2000), 500);
assert!(fetch_range.is_empty());
}

#[test]
fn calculate_range_to_fetch_catching_up() {
let head_height = 4000;

let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], None, 500);
assert_eq!(fetch_range, 3501..=4000);
let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], Some(3600), 500);
assert_eq!(fetch_range, 3601..=4000);
let fetch_range =
calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], None, 500);
assert_eq!(fetch_range, 3801..=4000);
let fetch_range =
calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], Some(3900), 500);
assert_eq!(fetch_range, 3901..=4000);
}

#[test]
Expand Down
74 changes: 74 additions & 0 deletions node/src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::Duration;

use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
use celestia_tendermint::Time;
use celestia_types::ExtendedHeader;
use futures::FutureExt;
use serde::{Deserialize, Serialize};
Expand All @@ -36,6 +37,7 @@ type Result<T, E = SyncerError> = std::result::Result<T, E>;

const MAX_HEADERS_IN_BATCH: u64 = 512;
const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
const SYNCING_WINDOW: Duration = Duration::from_secs(30 * 24 * 60 * 60); // 30 days

/// Representation of all the errors that can occur when interacting with the [`Syncer`].
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -180,6 +182,7 @@ where
headers_tx: mpsc::Sender<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
headers_rx: mpsc::Receiver<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
ongoing_batch: Option<Ongoing>,
estimated_syncing_window_end: Option<u64>,
}

struct Ongoing {
Expand Down Expand Up @@ -210,6 +213,7 @@ where
headers_tx,
headers_rx,
ongoing_batch: None,
estimated_syncing_window_end: None,
})
}

Expand Down Expand Up @@ -466,6 +470,7 @@ where
let next_batch = calculate_range_to_fetch(
subjective_head_height,
store_ranges.as_ref(),
self.estimated_syncing_window_end,
MAX_HEADERS_IN_BATCH,
);

Expand All @@ -474,6 +479,14 @@ where
return;
}

// make sure we're inside the syncing window before we start
if let Ok(known_header) = self.store.get_by_height(next_batch.end() + 1).await {
if !in_syncing_window(&known_header) {
self.estimated_syncing_window_end = Some(known_header.height().value());
return;
}
}

self.event_pub.send(NodeEvent::FetchingHeadersStarted {
from_height: *next_batch.start(),
to_height: *next_batch.end(),
Expand Down Expand Up @@ -540,6 +553,15 @@ where
}
}

fn in_syncing_window(header: &ExtendedHeader) -> bool {
let syncing_window_start = Time::now().checked_sub(SYNCING_WINDOW).unwrap_or_else(|| {
warn!("underflow when computing syncing window start, defaulting to unix epoch");
Time::unix_epoch()
});

header.time().after(syncing_window_start)
}

async fn try_init<S>(p2p: &P2p, store: &S) -> Result<u64>
where
S: Store,
Expand Down Expand Up @@ -713,6 +735,58 @@ mod tests {
p2p_mock.expect_no_cmd().await;
}

#[async_test]
async fn syncing_window_edge() {
let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
let mut gen = ExtendedHeaderGenerator::new();
gen.set_time((Time::now() - month_and_day_ago).expect("to not underflow"));
let mut headers = gen.next_many(1200);
gen.set_time(Time::now());
headers.append(&mut gen.next_many(2049 - 1200));

let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;

// Syncer requested the first batch ([1537..=2048])
handle_session_batch(
&mut p2p_mock,
&headers,
vec![
(1537, 64),
(1601, 64),
(1665, 64),
(1729, 64),
(1793, 64),
(1857, 64),
(1921, 64),
(1985, 64),
],
)
.await;
assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;

// Syncer requested the second batch ([1025, 1536]) hitting the syncing window
handle_session_batch(
&mut p2p_mock,
&headers,
vec![
(1025, 64),
(1089, 64),
(1153, 64),
(1217, 64),
(1281, 64),
(1345, 64),
(1409, 64),
(1473, 64),
],
)
.await;
assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;

// Syncer is fully synced and awaiting for events
p2p_mock.expect_no_cmd().await;
}

#[async_test]
async fn start_with_filled_store() {
let events = EventChannel::new();
Expand Down
Loading

0 comments on commit c842ed3

Please sign in to comment.