Skip to content

Commit

Permalink
chore: Single buffer block sync (#1318)
Browse files Browse the repository at this point in the history
Related issues:
- Closes FuelLabs/fuel-core#1167

---------

Co-authored-by: Brandon Kite <brandonkite92@gmail.com>
Co-authored-by: xgreenx <xgreenx9999@gmail.com>
  • Loading branch information
3 people committed Aug 28, 2023
1 parent 9de2fab commit 533da83
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 136 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ Description of the upcoming release here.

### Changed

- [#1318](https://github.com/FuelLabs/fuel-core/pull/1318): Modified block synchronization to use asynchronous task execution when retrieving block headers.
- [#1314](https://github.com/FuelLabs/fuel-core/pull/1314): Removed `types::ConsensusParameters` in favour of `fuel_tx:ConsensusParameters`.
- [#1302](https://github.com/FuelLabs/fuel-core/pull/1302): Removed the usage of flake and building of the bridge contract ABI.
It simplifies the maintenance and updating of the events, requiring only putting the event definition into the codebase of the relayer.
- [#1293](https://github.com/FuelLabs/fuel-core/issues/1293): Parallelized the `estimate_predicates` endpoint to utilize all available threads.
- [#1270](https://github.com/FuelLabs/fuel-core/pull/1270): Modify the way block headers are retrieved from peers to be done in batches.

#### Breaking
- [#1318](https://github.com/FuelLabs/fuel-core/pull/1318): Removed the `--sync-max-header-batch-requests` CLI argument, and renamed `--sync-max-get-txns` to `--sync-block-stream-buffer-size` to better represent the current behavior in the import.
- [#1279](https://github.com/FuelLabs/fuel-core/pull/1279): Added a new CLI flag to enable the Relayer service `--enable-relayer`, and disabled the Relayer service by default. When supplying the `--enable-relayer` flag, the `--relayer` argument becomes mandatory, and omitting it is an error. Similarly, providing a `--relayer` argument without the `--enable-relayer` flag is an error. Lastly, providing the `--keypair` or `--network` arguments will also produce an error if the `--enable-p2p` flag is not set.
- [#1262](https://github.com/FuelLabs/fuel-core/pull/1262): The `ConsensusParameters` aggregates all configuration data related to the consensus. It contains many fields that are segregated by the usage. The API of some functions was affected to use lesser types instead the whole `ConsensusParameters`. It is a huge breaking change requiring repetitively monotonically updating all places that use the `ConsensusParameters`. But during updating, consider that maybe you can use lesser types. Usage of them may simplify signatures of methods and make them more user-friendly and transparent.
- [#1290](https://github.com/FuelLabs/fuel-core/pull/1290): Standardize CLI args to use `-` instead of `_`
Expand Down
47 changes: 22 additions & 25 deletions benches/benches/import.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use criterion::{
criterion_group,
criterion_main,
measurement::WallTime,
BenchmarkGroup,
Criterion,
};
use fuel_core_benches::import::{
Expand All @@ -23,26 +21,28 @@ async fn execute_import(import: PressureImport, shutdown: &mut StateWatcher) {
import.import(shutdown).await.unwrap();
}

fn name(n: u32, durations: Durations, buffer_size: usize) -> String {
fn name(n: u32, durations: Durations, batch_size: u32, buffer_size: usize) -> String {
format!(
"import {n} * {d_h}/{d_c}/{d_t}/{d_e} - {sz}",
"import {n} * {d_h}/{d_c}/{d_t}/{d_e} - {bas}/{bus}",
n = n,
d_h = durations.headers.as_millis(),
d_c = durations.consensus.as_millis(),
d_t = durations.transactions.as_millis(),
d_e = durations.executes.as_millis(),
sz = buffer_size
bas = batch_size,
bus = buffer_size
)
}

fn bench_imports(c: &mut Criterion) {
let bench_import = |group: &mut BenchmarkGroup<WallTime>,
let bench_import = |c: &mut Criterion,
n: u32,
durations: Durations,
batch_size: u32,
buffer_size: usize| {
let name = name(n, durations, buffer_size);
group.bench_function(name, move |b| {
let name = name(n, durations, batch_size, buffer_size);
let mut group = c.benchmark_group(format!("import {}", name));
group.bench_function("bench", move |b| {
let rt = Runtime::new().unwrap();
b.to_async(&rt).iter_custom(|iters| async move {
let mut elapsed_time = Duration::default();
Expand All @@ -56,7 +56,6 @@ fn bench_imports(c: &mut Criterion) {
durations,
batch_size,
buffer_size,
buffer_size,
);
import.notify_one();
let start = std::time::Instant::now();
Expand All @@ -68,33 +67,31 @@ fn bench_imports(c: &mut Criterion) {
});
};

let mut group = c.benchmark_group("import");

let n = 100;
let durations = Durations {
headers: Duration::from_millis(5),
consensus: Duration::from_millis(5),
transactions: Duration::from_millis(5),
executes: Duration::from_millis(10),
headers: Duration::from_millis(10),
consensus: Duration::from_millis(10),
transactions: Duration::from_millis(10),
executes: Duration::from_millis(5),
};

// Header batch size = 10, header/txn buffer size = 10
bench_import(&mut group, n, durations, 10, 10);
bench_import(c, n, durations, 10, 10);

// Header batch size = 20, header/txn buffer size = 10
bench_import(&mut group, n, durations, 20, 10);
// Header batch size = 10, header/txn buffer size = 25
bench_import(c, n, durations, 10, 25);

// Header batch size = 50, header/txn buffer size = 10
bench_import(&mut group, n, durations, 20, 10);
// Header batch size = 10, header/txn buffer size = 50
bench_import(c, n, durations, 10, 50);

// Header batch size = 10, header/txn buffer size = 20
bench_import(&mut group, n, durations, 10, 20);
// Header batch size = 25, header/txn buffer size = 10
bench_import(c, n, durations, 25, 10);

// Header batch size = 10, header/txn buffer size = 50
bench_import(&mut group, n, durations, 10, 50);
// Header batch size = 50, header/txn buffer size = 10
bench_import(c, n, durations, 50, 10);

// Header batch size = 50, header/txn buffer size = 50
bench_import(&mut group, n, durations, 10, 20);
bench_import(c, n, durations, 50, 50);
}

criterion_group!(benches, bench_imports);
Expand Down
6 changes: 2 additions & 4 deletions benches/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,16 @@ pub fn provision_import_test(
shared_state: SharedMutex<State>,
input: Durations,
header_batch_size: u32,
max_header_batch_requests: usize,
max_get_txns_requests: usize,
block_stream_buffer_size: usize,
) -> (
PressureImport,
Sender<fuel_core_services::State>,
StateWatcher,
) {
let shared_notify = Arc::new(Notify::new());
let params = Config {
max_header_batch_requests,
header_batch_size,
max_get_txns_requests,
block_stream_buffer_size,
};
let p2p = Arc::new(PressurePeerToPeer::new(
shared_count.clone(),
Expand Down
10 changes: 3 additions & 7 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,11 @@ pub struct P2PArgs {
#[derive(Debug, Clone, Args)]
pub struct SyncArgs {
/// The maximum number of get transaction requests to make in a single batch.
#[clap(long = "sync-max-get-txns", default_value = "10", env)]
pub max_get_txns_requests: usize,
#[clap(long = "sync-block-stream-buffer-size", default_value = "10", env)]
pub block_stream_buffer_size: usize,
/// The maximum number of headers to request in a single batch.
#[clap(long = "sync-header-batch-size", default_value = "10", env)]
pub header_batch_size: u32,
/// The maximum number of header batch requests to have active at one time.
#[clap(long = "sync-max-header-batch-requests", default_value = "10", env)]
pub max_header_batch_requests: usize,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -218,9 +215,8 @@ impl KeypairArg {
impl From<SyncArgs> for fuel_core::sync::Config {
fn from(value: SyncArgs) -> Self {
Self {
max_get_txns_requests: value.max_get_txns_requests,
block_stream_buffer_size: value.block_stream_buffer_size,
header_batch_size: value.header_batch_size,
max_header_batch_requests: value.max_header_batch_requests,
}
}
}
Expand Down
159 changes: 100 additions & 59 deletions crates/services/sync/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
//! importing blocks from the network into the local blockchain.
use std::{
future::Future,
iter,
ops::RangeInclusive,
sync::Arc,
};

use anyhow::anyhow;
use fuel_core_services::{
SharedMutex,
StateWatcher,
Expand All @@ -23,6 +26,7 @@ use fuel_core_types::{
};
use futures::{
stream::StreamExt,
FutureExt,
Stream,
};
use tokio::sync::Notify;
Expand Down Expand Up @@ -56,19 +60,16 @@ mod back_pressure_tests;
/// Parameters for the import task.
pub struct Config {
/// The maximum number of get transaction requests to make in a single batch.
pub max_get_txns_requests: usize,
pub block_stream_buffer_size: usize,
/// The maximum number of headers to request in a single batch.
pub header_batch_size: u32,
/// The maximum number of header batch requests to have active at one time.
pub max_header_batch_requests: usize,
}

impl Default for Config {
fn default() -> Self {
Self {
max_get_txns_requests: 10,
block_stream_buffer_size: 10,
header_batch_size: 100,
max_header_batch_requests: 10,
}
}
}
Expand Down Expand Up @@ -174,18 +175,30 @@ where
..
} = &self;

get_headers_buffered(range.clone(), params, p2p.clone())
.map({
let p2p = p2p.clone();
let consensus_port = consensus.clone();
move |result| {
Self::get_block_for_header(result, p2p.clone(), consensus_port.clone())
}
.instrument(tracing::debug_span!("consensus_and_transactions"))
.in_current_span()
let shutdown_signal = shutdown.clone();
let (shutdown_guard, mut shutdown_guard_recv) =
tokio::sync::mpsc::channel::<()>(1);
let block_stream =
get_block_stream(range.clone(), params, p2p.clone(), consensus.clone());
let result = block_stream
.map(move |stream_block_batch| {
let shutdown_guard = shutdown_guard.clone();
let shutdown_signal = shutdown_signal.clone();
tokio::spawn(async move {
// Hold a shutdown sender for the lifetime of the spawned task
let _shutdown_guard = shutdown_guard.clone();
let mut shutdown_signal = shutdown_signal.clone();
tokio::select! {
// Stream a batch of blocks
blocks = stream_block_batch => blocks,
// If a shutdown signal is received during the stream, terminate early and
// return an empty response
_ = shutdown_signal.while_started() => Ok(None)
}
}).then(|task| async { task.map_err(|e| anyhow!(e))? })
})
// Request up to `max_get_txns_requests` transactions from the network.
.buffered(params.max_get_txns_requests)
// Request up to `block_stream_buffer_size` transactions from the network.
.buffered(params.block_stream_buffer_size)
// Continue the stream unless an error or none occurs.
// Note the error will be returned but the stream will close.
.into_scan_none_or_err()
Expand Down Expand Up @@ -230,66 +243,58 @@ where
}
})
.in_current_span()
.await
.await;

// Wait for any spawned tasks to shutdown
let _ = shutdown_guard_recv.recv().await;
result
}
}

async fn get_block_for_header(
result: anyhow::Result<SourcePeer<SealedBlockHeader>>,
p2p: Arc<P>,
consensus_port: Arc<C>,
) -> anyhow::Result<Option<SealedBlock>> {
let header = match result {
Ok(h) => h,
Err(e) => return Err(e),
};
let SourcePeer {
peer_id,
data: header,
} = header;
let id = header.entity.id();
let block_id = SourcePeer { peer_id, data: id };

// Check the consensus is valid on this header.
if !consensus_port
.check_sealed_header(&header)
.trace_err("Failed to check consensus on header")?
{
tracing::warn!("Header {:?} failed consensus check", header);
return Ok(None)
fn get_block_stream<
P: PeerToPeerPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
>(
range: RangeInclusive<u32>,
params: &Config,
p2p: Arc<P>,
consensus: Arc<C>,
) -> impl Stream<Item = impl Future<Output = anyhow::Result<Option<SealedBlock>>>> {
get_header_stream(range, params, p2p.clone()).map({
let p2p = p2p.clone();
let consensus_port = consensus.clone();
move |batch| {
{
let p2p = p2p.clone();
let consensus_port = consensus_port.clone();
get_sealed_blocks(batch, p2p.clone(), consensus_port.clone())
}
.instrument(tracing::debug_span!("consensus_and_transactions"))
.in_current_span()
}

// Wait for the da to be at least the da height on the header.
consensus_port
.await_da_height(&header.entity.da_height)
.await?;

get_transactions_on_block(p2p.as_ref(), block_id, header).await
}
})
}

fn get_headers_buffered<P: PeerToPeerPort + Send + Sync + 'static>(
fn get_header_stream<P: PeerToPeerPort + Send + Sync + 'static>(
range: RangeInclusive<u32>,
params: &Config,
p2p: Arc<P>,
) -> impl Stream<Item = anyhow::Result<SourcePeer<SealedBlockHeader>>> {
let Config {
header_batch_size,
max_header_batch_requests,
..
header_batch_size, ..
} = params;
futures::stream::iter(range_chunks(range, *header_batch_size))
.map(move |range| {
let ranges = range_chunks(range, *header_batch_size);
let p2p_gen = iter::repeat_with(move || p2p.clone());
let iter = ranges.zip(p2p_gen);
futures::stream::iter(iter)
.then(move |(range, p2p)| async {
tracing::debug!(
"getting header range from {} to {} inclusive",
range.start(),
range.end()
);
let p2p = p2p.clone();
async move { get_headers_batch(range, p2p).await }
.instrument(tracing::debug_span!("get_headers_batch"))
.in_current_span()
get_headers_batch(range, p2p).await
})
.buffered(*max_header_batch_requests)
.flatten()
.into_scan_none_or_err()
.scan_none_or_err()
Expand All @@ -306,6 +311,42 @@ fn range_chunks(
})
}

async fn get_sealed_blocks<
P: PeerToPeerPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
>(
result: anyhow::Result<SourcePeer<SealedBlockHeader>>,
p2p: Arc<P>,
consensus_port: Arc<C>,
) -> anyhow::Result<Option<SealedBlock>> {
let header = match result {
Ok(h) => h,
Err(e) => return Err(e),
};
let SourcePeer {
peer_id,
data: header,
} = header;
let id = header.entity.id();
let block_id = SourcePeer { peer_id, data: id };

// Check the consensus is valid on this header.
if !consensus_port
.check_sealed_header(&header)
.trace_err("Failed to check consensus on header")?
{
tracing::warn!("Header {:?} failed consensus check", header);
return Ok(None)
}

// Wait for the da to be at least the da height on the header.
consensus_port
.await_da_height(&header.entity.da_height)
.await?;

get_transactions_on_block(p2p.as_ref(), block_id, header).await
}

/// Waits for a notify or shutdown signal.
/// Returns true if the notify signal was received.
async fn wait_for_notify_or_shutdown(
Expand Down
Loading

0 comments on commit 533da83

Please sign in to comment.