diff --git a/CHANGELOG.md b/CHANGELOG.md index f2cf9722..9f98e6a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Description of the upcoming release here. ### Changed +- [#1318](https://github.com/FuelLabs/fuel-core/pull/1318): Modified block synchronization to use asynchronous task execution when retrieving block headers. - [#1314](https://github.com/FuelLabs/fuel-core/pull/1314): Removed `types::ConsensusParameters` in favour of `fuel_tx:ConsensusParameters`. - [#1302](https://github.com/FuelLabs/fuel-core/pull/1302): Removed the usage of flake and building of the bridge contract ABI. It simplifies the maintenance and updating of the events, requiring only putting the event definition into the codebase of the relayer. @@ -26,6 +27,7 @@ Description of the upcoming release here. - [#1270](https://github.com/FuelLabs/fuel-core/pull/1270): Modify the way block headers are retrieved from peers to be done in batches. #### Breaking +- [#1318](https://github.com/FuelLabs/fuel-core/pull/1318): Removed the `--sync-max-header-batch-requests` CLI argument, and renamed `--sync-max-get-txns` to `--sync-block-stream-buffer-size` to better represent the current behavior in the import. - [#1279](https://github.com/FuelLabs/fuel-core/pull/1279): Added a new CLI flag to enable the Relayer service `--enable-relayer`, and disabled the Relayer service by default. When supplying the `--enable-relayer` flag, the `--relayer` argument becomes mandatory, and omitting it is an error. Similarly, providing a `--relayer` argument without the `--enable-relayer` flag is an error. Lastly, providing the `--keypair` or `--network` arguments will also produce an error if the `--enable-p2p` flag is not set. - [#1262](https://github.com/FuelLabs/fuel-core/pull/1262): The `ConsensusParameters` aggregates all configuration data related to the consensus. It contains many fields that are segregated by the usage. The API of some functions was affected to use lesser types instead the whole `ConsensusParameters`. It is a huge breaking change requiring repetitively monotonically updating all places that use the `ConsensusParameters`. But during updating, consider that maybe you can use lesser types. Usage of them may simplify signatures of methods and make them more user-friendly and transparent. - [#1290](https://github.com/FuelLabs/fuel-core/pull/1290): Standardize CLI args to use `-` instead of `_` diff --git a/benches/benches/import.rs b/benches/benches/import.rs index c1c1a854..c8402b37 100644 --- a/benches/benches/import.rs +++ b/benches/benches/import.rs @@ -1,8 +1,6 @@ use criterion::{ criterion_group, criterion_main, - measurement::WallTime, - BenchmarkGroup, Criterion, }; use fuel_core_benches::import::{ @@ -23,26 +21,28 @@ async fn execute_import(import: PressureImport, shutdown: &mut StateWatcher) { import.import(shutdown).await.unwrap(); } -fn name(n: u32, durations: Durations, buffer_size: usize) -> String { +fn name(n: u32, durations: Durations, batch_size: u32, buffer_size: usize) -> String { format!( - "import {n} * {d_h}/{d_c}/{d_t}/{d_e} - {sz}", + "import {n} * {d_h}/{d_c}/{d_t}/{d_e} - {bas}/{bus}", n = n, d_h = durations.headers.as_millis(), d_c = durations.consensus.as_millis(), d_t = durations.transactions.as_millis(), d_e = durations.executes.as_millis(), - sz = buffer_size + bas = batch_size, + bus = buffer_size ) } fn bench_imports(c: &mut Criterion) { - let bench_import = |group: &mut BenchmarkGroup, + let bench_import = |c: &mut Criterion, n: u32, durations: Durations, batch_size: u32, buffer_size: usize| { - let name = name(n, durations, buffer_size); - group.bench_function(name, move |b| { + let name = name(n, durations, batch_size, buffer_size); + let mut group = c.benchmark_group(format!("import {}", name)); + group.bench_function("bench", move |b| { let rt = Runtime::new().unwrap(); b.to_async(&rt).iter_custom(|iters| async move { let mut elapsed_time = Duration::default(); @@ -56,7 +56,6 @@ fn bench_imports(c: &mut Criterion) { durations, batch_size, buffer_size, - buffer_size, ); import.notify_one(); let start = std::time::Instant::now(); @@ -68,33 +67,31 @@ fn bench_imports(c: &mut Criterion) { }); }; - let mut group = c.benchmark_group("import"); - let n = 100; let durations = Durations { - headers: Duration::from_millis(5), - consensus: Duration::from_millis(5), - transactions: Duration::from_millis(5), - executes: Duration::from_millis(10), + headers: Duration::from_millis(10), + consensus: Duration::from_millis(10), + transactions: Duration::from_millis(10), + executes: Duration::from_millis(5), }; // Header batch size = 10, header/txn buffer size = 10 - bench_import(&mut group, n, durations, 10, 10); + bench_import(c, n, durations, 10, 10); - // Header batch size = 20, header/txn buffer size = 10 - bench_import(&mut group, n, durations, 20, 10); + // Header batch size = 10, header/txn buffer size = 25 + bench_import(c, n, durations, 10, 25); - // Header batch size = 50, header/txn buffer size = 10 - bench_import(&mut group, n, durations, 20, 10); + // Header batch size = 10, header/txn buffer size = 50 + bench_import(c, n, durations, 10, 50); - // Header batch size = 10, header/txn buffer size = 20 - bench_import(&mut group, n, durations, 10, 20); + // Header batch size = 25, header/txn buffer size = 10 + bench_import(c, n, durations, 25, 10); - // Header batch size = 10, header/txn buffer size = 50 - bench_import(&mut group, n, durations, 10, 50); + // Header batch size = 50, header/txn buffer size = 10 + bench_import(c, n, durations, 50, 10); // Header batch size = 50, header/txn buffer size = 50 - bench_import(&mut group, n, durations, 10, 20); + bench_import(c, n, durations, 50, 50); } criterion_group!(benches, bench_imports); diff --git a/benches/src/import.rs b/benches/src/import.rs index 3d79206b..88f26857 100644 --- a/benches/src/import.rs +++ b/benches/src/import.rs @@ -40,8 +40,7 @@ pub fn provision_import_test( shared_state: SharedMutex, input: Durations, header_batch_size: u32, - max_header_batch_requests: usize, - max_get_txns_requests: usize, + block_stream_buffer_size: usize, ) -> ( PressureImport, Sender, @@ -49,9 +48,8 @@ pub fn provision_import_test( ) { let shared_notify = Arc::new(Notify::new()); let params = Config { - max_header_batch_requests, header_batch_size, - max_get_txns_requests, + block_stream_buffer_size, }; let p2p = Arc::new(PressurePeerToPeer::new( shared_count.clone(), diff --git a/bin/fuel-core/src/cli/run/p2p.rs b/bin/fuel-core/src/cli/run/p2p.rs index ea1fda55..b48631ea 100644 --- a/bin/fuel-core/src/cli/run/p2p.rs +++ b/bin/fuel-core/src/cli/run/p2p.rs @@ -180,14 +180,11 @@ pub struct P2PArgs { #[derive(Debug, Clone, Args)] pub struct SyncArgs { /// The maximum number of get transaction requests to make in a single batch. - #[clap(long = "sync-max-get-txns", default_value = "10", env)] - pub max_get_txns_requests: usize, + #[clap(long = "sync-block-stream-buffer-size", default_value = "10", env)] + pub block_stream_buffer_size: usize, /// The maximum number of headers to request in a single batch. #[clap(long = "sync-header-batch-size", default_value = "10", env)] pub header_batch_size: u32, - /// The maximum number of header batch requests to have active at one time. - #[clap(long = "sync-max-header-batch-requests", default_value = "10", env)] - pub max_header_batch_requests: usize, } #[derive(Clone, Debug)] @@ -218,9 +215,8 @@ impl KeypairArg { impl From for fuel_core::sync::Config { fn from(value: SyncArgs) -> Self { Self { - max_get_txns_requests: value.max_get_txns_requests, + block_stream_buffer_size: value.block_stream_buffer_size, header_batch_size: value.header_batch_size, - max_header_batch_requests: value.max_header_batch_requests, } } } diff --git a/crates/services/sync/src/import.rs b/crates/services/sync/src/import.rs index 19658e22..cd9c355a 100644 --- a/crates/services/sync/src/import.rs +++ b/crates/services/sync/src/import.rs @@ -3,10 +3,13 @@ //! importing blocks from the network into the local blockchain. use std::{ + future::Future, + iter, ops::RangeInclusive, sync::Arc, }; +use anyhow::anyhow; use fuel_core_services::{ SharedMutex, StateWatcher, @@ -23,6 +26,7 @@ use fuel_core_types::{ }; use futures::{ stream::StreamExt, + FutureExt, Stream, }; use tokio::sync::Notify; @@ -56,19 +60,16 @@ mod back_pressure_tests; /// Parameters for the import task. pub struct Config { /// The maximum number of get transaction requests to make in a single batch. - pub max_get_txns_requests: usize, + pub block_stream_buffer_size: usize, /// The maximum number of headers to request in a single batch. pub header_batch_size: u32, - /// The maximum number of header batch requests to have active at one time. - pub max_header_batch_requests: usize, } impl Default for Config { fn default() -> Self { Self { - max_get_txns_requests: 10, + block_stream_buffer_size: 10, header_batch_size: 100, - max_header_batch_requests: 10, } } } @@ -174,18 +175,30 @@ where .. } = &self; - get_headers_buffered(range.clone(), params, p2p.clone()) - .map({ - let p2p = p2p.clone(); - let consensus_port = consensus.clone(); - move |result| { - Self::get_block_for_header(result, p2p.clone(), consensus_port.clone()) - } - .instrument(tracing::debug_span!("consensus_and_transactions")) - .in_current_span() + let shutdown_signal = shutdown.clone(); + let (shutdown_guard, mut shutdown_guard_recv) = + tokio::sync::mpsc::channel::<()>(1); + let block_stream = + get_block_stream(range.clone(), params, p2p.clone(), consensus.clone()); + let result = block_stream + .map(move |stream_block_batch| { + let shutdown_guard = shutdown_guard.clone(); + let shutdown_signal = shutdown_signal.clone(); + tokio::spawn(async move { + // Hold a shutdown sender for the lifetime of the spawned task + let _shutdown_guard = shutdown_guard.clone(); + let mut shutdown_signal = shutdown_signal.clone(); + tokio::select! { + // Stream a batch of blocks + blocks = stream_block_batch => blocks, + // If a shutdown signal is received during the stream, terminate early and + // return an empty response + _ = shutdown_signal.while_started() => Ok(None) + } + }).then(|task| async { task.map_err(|e| anyhow!(e))? }) }) - // Request up to `max_get_txns_requests` transactions from the network. - .buffered(params.max_get_txns_requests) + // Request up to `block_stream_buffer_size` transactions from the network. + .buffered(params.block_stream_buffer_size) // Continue the stream unless an error or none occurs. // Note the error will be returned but the stream will close. .into_scan_none_or_err() @@ -230,66 +243,58 @@ where } }) .in_current_span() - .await + .await; + + // Wait for any spawned tasks to shutdown + let _ = shutdown_guard_recv.recv().await; + result } +} - async fn get_block_for_header( - result: anyhow::Result>, - p2p: Arc

, - consensus_port: Arc, - ) -> anyhow::Result> { - let header = match result { - Ok(h) => h, - Err(e) => return Err(e), - }; - let SourcePeer { - peer_id, - data: header, - } = header; - let id = header.entity.id(); - let block_id = SourcePeer { peer_id, data: id }; - - // Check the consensus is valid on this header. - if !consensus_port - .check_sealed_header(&header) - .trace_err("Failed to check consensus on header")? - { - tracing::warn!("Header {:?} failed consensus check", header); - return Ok(None) +fn get_block_stream< + P: PeerToPeerPort + Send + Sync + 'static, + C: ConsensusPort + Send + Sync + 'static, +>( + range: RangeInclusive, + params: &Config, + p2p: Arc

, + consensus: Arc, +) -> impl Stream>>> { + get_header_stream(range, params, p2p.clone()).map({ + let p2p = p2p.clone(); + let consensus_port = consensus.clone(); + move |batch| { + { + let p2p = p2p.clone(); + let consensus_port = consensus_port.clone(); + get_sealed_blocks(batch, p2p.clone(), consensus_port.clone()) + } + .instrument(tracing::debug_span!("consensus_and_transactions")) + .in_current_span() } - - // Wait for the da to be at least the da height on the header. - consensus_port - .await_da_height(&header.entity.da_height) - .await?; - - get_transactions_on_block(p2p.as_ref(), block_id, header).await - } + }) } -fn get_headers_buffered( +fn get_header_stream( range: RangeInclusive, params: &Config, p2p: Arc

, ) -> impl Stream>> { let Config { - header_batch_size, - max_header_batch_requests, - .. + header_batch_size, .. } = params; - futures::stream::iter(range_chunks(range, *header_batch_size)) - .map(move |range| { + let ranges = range_chunks(range, *header_batch_size); + let p2p_gen = iter::repeat_with(move || p2p.clone()); + let iter = ranges.zip(p2p_gen); + futures::stream::iter(iter) + .then(move |(range, p2p)| async { tracing::debug!( "getting header range from {} to {} inclusive", range.start(), range.end() ); - let p2p = p2p.clone(); - async move { get_headers_batch(range, p2p).await } - .instrument(tracing::debug_span!("get_headers_batch")) - .in_current_span() + get_headers_batch(range, p2p).await }) - .buffered(*max_header_batch_requests) .flatten() .into_scan_none_or_err() .scan_none_or_err() @@ -306,6 +311,42 @@ fn range_chunks( }) } +async fn get_sealed_blocks< + P: PeerToPeerPort + Send + Sync + 'static, + C: ConsensusPort + Send + Sync + 'static, +>( + result: anyhow::Result>, + p2p: Arc

, + consensus_port: Arc, +) -> anyhow::Result> { + let header = match result { + Ok(h) => h, + Err(e) => return Err(e), + }; + let SourcePeer { + peer_id, + data: header, + } = header; + let id = header.entity.id(); + let block_id = SourcePeer { peer_id, data: id }; + + // Check the consensus is valid on this header. + if !consensus_port + .check_sealed_header(&header) + .trace_err("Failed to check consensus on header")? + { + tracing::warn!("Header {:?} failed consensus check", header); + return Ok(None) + } + + // Wait for the da to be at least the da height on the header. + consensus_port + .await_da_height(&header.entity.da_height) + .await?; + + get_transactions_on_block(p2p.as_ref(), block_id, header).await +} + /// Waits for a notify or shutdown signal. /// Returns true if the notify signal was received. async fn wait_for_notify_or_shutdown( diff --git a/crates/services/sync/src/import/back_pressure_tests.rs b/crates/services/sync/src/import/back_pressure_tests.rs index 6dea34a7..c58fc970 100644 --- a/crates/services/sync/src/import/back_pressure_tests.rs +++ b/crates/services/sync/src/import/back_pressure_tests.rs @@ -21,9 +21,8 @@ struct Input { #[test_case( Input::default(), State::new(None, None), Config{ - max_get_txns_requests: 1, + block_stream_buffer_size: 1, header_batch_size: 1, - max_header_batch_requests: 1, } => Count::default() ; "Empty sanity test" )] @@ -34,9 +33,8 @@ struct Input { }, State::new(None, 0), Config{ - max_get_txns_requests: 1, + block_stream_buffer_size: 1, header_batch_size: 1, - max_header_batch_requests: 1, } => is less_or_equal_than Count{ headers: 1, consensus: 1, transactions: 1, executes: 1, blocks: 1 } ; "Single with slow headers" @@ -48,9 +46,8 @@ struct Input { }, State::new(None, 100), Config{ - max_get_txns_requests: 10, + block_stream_buffer_size: 10, header_batch_size: 10, - max_header_batch_requests: 1, } => is less_or_equal_than Count{ headers: 10, consensus: 10, transactions: 10, executes: 1, blocks: 21 } ; "100 headers with max 10 with slow headers" @@ -62,9 +59,8 @@ struct Input { }, State::new(None, 100), Config{ - max_get_txns_requests: 10, + block_stream_buffer_size: 10, header_batch_size: 10, - max_header_batch_requests: 1, } => is less_or_equal_than Count{ headers: 10, consensus: 10, transactions: 10, executes: 1, blocks: 21 } ; "100 headers with max 10 with slow transactions" @@ -76,9 +72,8 @@ struct Input { }, State::new(None, 50), Config{ - max_get_txns_requests: 10, + block_stream_buffer_size: 10, header_batch_size: 10, - max_header_batch_requests: 1, } => is less_or_equal_than Count{ headers: 10, consensus: 10, transactions: 10, executes: 1, blocks: 21 } ; "50 headers with max 10 with slow executes" @@ -90,9 +85,8 @@ struct Input { }, State::new(None, 50), Config{ - max_get_txns_requests: 10, + block_stream_buffer_size: 10, header_batch_size: 10, - max_header_batch_requests: 10, } => is less_or_equal_than Count{ headers: 10, consensus: 10, transactions: 10, executes: 1, blocks: 21 } ; "50 headers with max 10 size and max 10 requests" diff --git a/crates/services/sync/src/import/tests.rs b/crates/services/sync/src/import/tests.rs index 796dc700..d81a9bea 100644 --- a/crates/services/sync/src/import/tests.rs +++ b/crates/services/sync/src/import/tests.rs @@ -53,13 +53,17 @@ async fn import__signature_fails_on_header_4_only() { let mut consensus_port = MockConsensusPort::default(); consensus_port .expect_check_sealed_header() - .times(1) + .times(2) .returning(|h| Ok(**h.entity.height() != 4)); + consensus_port + .expect_await_da_height() + .times(1) + .returning(|_| Ok(())); let state = State::new(3, 5).into(); let mocks = Mocks { consensus_port, - p2p: DefaultMocks::times([0]), + p2p: DefaultMocks::times([1]), executor: DefaultMocks::times([0]), }; @@ -172,13 +176,13 @@ async fn import__transactions_not_found() { .times(1) .returning(|_| Ok(Some(vec![empty_header(4.into()), empty_header(5.into())]))); p2p.expect_get_transactions() - .times(1) + .times(2) .returning(|_| Ok(None)); let state = State::new(3, 5).into(); let mocks = Mocks { p2p, - consensus_port: DefaultMocks::times([1]), + consensus_port: DefaultMocks::times([2]), executor: DefaultMocks::times([0]), }; @@ -197,7 +201,7 @@ async fn import__transactions_not_found_for_header_4() { .times(1) .returning(|_| Ok(Some(vec![empty_header(4.into()), empty_header(5.into())]))); let mut height = 3; - p2p.expect_get_transactions().times(1).returning(move |_| { + p2p.expect_get_transactions().times(2).returning(move |_| { height += 1; if height == 4 { Ok(None) @@ -209,7 +213,7 @@ async fn import__transactions_not_found_for_header_4() { let state = State::new(3, 5).into(); let mocks = Mocks { p2p, - consensus_port: DefaultMocks::times([1]), + consensus_port: DefaultMocks::times([2]), executor: DefaultMocks::times([0]), }; @@ -281,7 +285,7 @@ async fn import__p2p_error_on_4_transactions() { .times(1) .returning(|_| Ok(Some(vec![empty_header(4.into()), empty_header(5.into())]))); let mut height = 3; - p2p.expect_get_transactions().times(1).returning(move |_| { + p2p.expect_get_transactions().times(2).returning(move |_| { height += 1; if height == 4 { Err(anyhow::anyhow!("Some network error")) @@ -293,7 +297,7 @@ async fn import__p2p_error_on_4_transactions() { let state = State::new(3, 5).into(); let mocks = Mocks { p2p, - consensus_port: DefaultMocks::times([1]), + consensus_port: DefaultMocks::times([2]), executor: DefaultMocks::times([0]), }; @@ -341,7 +345,7 @@ async fn import__consensus_error_on_4() { let mut consensus_port = MockConsensusPort::default(); consensus_port .expect_check_sealed_header() - .times(1) + .times(2) .returning(|h| { if **h.entity.height() == 4 { Err(anyhow::anyhow!("Some consensus error")) @@ -349,11 +353,15 @@ async fn import__consensus_error_on_4() { Ok(true) } }); + consensus_port + .expect_await_da_height() + .times(1) + .returning(|_| Ok(())); let state = State::new(3, 5).into(); let mocks = Mocks { consensus_port, - p2p: DefaultMocks::times([0]), + p2p: DefaultMocks::times([1]), executor: DefaultMocks::times([0]), }; @@ -414,8 +422,8 @@ async fn import__execution_error_on_header_4() { let state = State::new(3, 5).into(); let mocks = Mocks { - consensus_port: DefaultMocks::times([1]), - p2p: DefaultMocks::times([1]), + consensus_port: DefaultMocks::times([2]), + p2p: DefaultMocks::times([2]), executor, }; @@ -461,7 +469,7 @@ async fn signature_always_fails() { let mut consensus_port = MockConsensusPort::default(); consensus_port .expect_check_sealed_header() - .times(1) + .times(2) .returning(|_| Ok(false)); let state = State::new(3, 5).into(); @@ -520,9 +528,8 @@ async fn test_import_inner( executor, } = mocks; let params = Config { - max_get_txns_requests: 10, + block_stream_buffer_size: 10, header_batch_size: 10, - max_header_batch_requests: 10, }; let p2p = Arc::new(p2p); diff --git a/crates/services/sync/src/service/tests.rs b/crates/services/sync/src/service/tests.rs index 04617bb9..bd1d5912 100644 --- a/crates/services/sync/src/service/tests.rs +++ b/crates/services/sync/src/service/tests.rs @@ -59,9 +59,8 @@ async fn test_new_service() { .returning(|_| Ok(true)); consensus.expect_await_da_height().returning(|_| Ok(())); let params = Config { - max_get_txns_requests: 10, + block_stream_buffer_size: 10, header_batch_size: 10, - max_header_batch_requests: 10, }; let s = new_service(4u32.into(), p2p, importer, consensus, params).unwrap(); diff --git a/deployment/charts/templates/fuel-core-deploy.yaml b/deployment/charts/templates/fuel-core-deploy.yaml index 2066b38e..4b274eba 100644 --- a/deployment/charts/templates/fuel-core-deploy.yaml +++ b/deployment/charts/templates/fuel-core-deploy.yaml @@ -220,18 +220,14 @@ spec: - "--max-transmit-size" - "{{ .Values.app.max_transmit_size }}" {{- end }} - {{- if .Values.app.sync_max_get_txns }} - - "--sync-max-get-txns" - - "{{ .Values.app.sync_max_get_txns }}" + {{- if .Values.app.sync_block_stream_size }} + - "--sync-block-stream-size" + - "{{ .Values.app.sync_block_stream_size }}" {{- end }} - {{- if .Values.app.sync_header_batch_size}} + {{- if .Values.app.sync_header_batch_size }} - "--sync-header-batch-size" - "{{ .Values.app.sync_header_batch_size }}" {{- end }} - {{- if .Values.app.sync_max_header_batch_requests }} - - "--sync-max-header-batch-requests" - - "{{ .Values.app.sync_max_header_batch_requests }}" - {{- end }} {{- if .Values.app.reserved_nodes_only_mode }} - "--reserved-nodes-only-mode" {{- end}} diff --git a/deployment/charts/values.yaml b/deployment/charts/values.yaml index 384b6932..5f6dcafa 100644 --- a/deployment/charts/values.yaml +++ b/deployment/charts/values.yaml @@ -21,9 +21,8 @@ app: max_headers_per_request: "${fuel_core_max_headers_per_request}" max_database_cache_size: "${fuel_core_max_database_cache_size}" max_transmit_size: "${fuel_core_max_buffer_size}" - sync_max_get_txns: "${fuel_core_sync_max_get_txns}" + sync_block_stream_size: "${fuel_core_sync_block_stream_size}" sync_header_batch_size: "${fuel_core_sync_header_batch_size}" - sync_max_header_batch_requests: "${fuel_core_sync_max_header_batch_requests}" p2p_key: ${fuel_core_p2p_key} allow_private_addresses: ${fuel_core_allow_private_addresses} reserved_nodes_only_mode: ${fuel_core_reserved_only} diff --git a/deployment/scripts/.env b/deployment/scripts/.env index f225ae81..d7bb380a 100644 --- a/deployment/scripts/.env +++ b/deployment/scripts/.env @@ -65,9 +65,8 @@ fuel_core_reserved_nodes="/dns4/test.test.svc.cluster.local/tcp/30333/p2p/16Uiu2 # fuel_core_bootstrap_nodes="/dns4/test.test.svc.cluster.local/tcp/30333/p2p/16Uiu2HAmEB6RQuDfEZjvosRRundrEddfGqgRq51EReNV9E4pfDw5,/dns4/sentry-3/tcp/30333/16Uiu2HAmEB6RQuDfEZjvosRRundrEddfGqgRq51EReNV9E4pfDw5" # Sync Environment Variables -fuel_core_sync_max_get_txns="10" +fuel_core_sync_block_stream_size="10" fuel_core_sync_header_batch_size="10" -fuel_core_sync_max_header_batch_requests="10" # Ingress Environment variables fuel_core_ingress_dns="node.example.com"