Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speeds up synchronisation of the blocks for the fuel-core-sync service #1916

Merged
merged 28 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
59bd7dc
Removed dead code
xgreenx May 29, 2024
5ba7821
Updated CHANGELOG.md
xgreenx May 29, 2024
cfc3029
Fixed halting of the node in rare conditions
xgreenx May 29, 2024
e27ac53
Updated CHANGELOG.md
xgreenx May 29, 2024
9ad525a
This change improves and simplifies some aspects of the P2P service. …
xgreenx May 29, 2024
fb1b8fd
Updated CHANGELOG.md
xgreenx May 29, 2024
22acbaa
Merge branch 'master' into feature/fixed-dead-lock
xgreenx May 29, 2024
54c9852
Merge branch 'feature/fixed-dead-lock' into feature/fixed-p2p-reconne…
xgreenx May 29, 2024
93f0ed9
The change speeds up synchronization and removes noise logs when the …
xgreenx May 29, 2024
f9b3a58
Updated CHANGELOG.md
xgreenx May 29, 2024
c626716
Make CI happy
xgreenx May 29, 2024
a9a1533
Merge branch 'master' into feature/fixed-dead-lock
xgreenx May 29, 2024
7df6235
Merge branch 'feature/fixed-dead-lock' into feature/fixed-p2p-reconne…
xgreenx May 29, 2024
8eac617
Merge branch 'feature/fixed-p2p-reconnection-issue' into feature/spee…
xgreenx May 29, 2024
47b5c07
Merge branch 'master' into feature/fixed-p2p-reconnection-issue
xgreenx May 30, 2024
e71729a
Merge branch 'feature/fixed-p2p-reconnection-issue' into feature/spee…
xgreenx May 30, 2024
241e6ec
Found bug when execution fails
xgreenx Jun 2, 2024
6b8e671
Merge branch 'master' into feature/fixed-p2p-reconnection-issue
xgreenx Jun 3, 2024
f5650f5
Merge branch 'feature/fixed-p2p-reconnection-issue' into feature/spee…
xgreenx Jun 3, 2024
324be3f
Merge branch 'master' into feature/fixed-p2p-reconnection-issue
xgreenx Jun 3, 2024
e9da8b9
Merge branch 'master' into feature/fixed-p2p-reconnection-issue
xgreenx Jun 3, 2024
54d4fd5
Added comments
xgreenx Jun 3, 2024
290fe5f
Merge remote-tracking branch 'origin/feature/fixed-p2p-reconnection-i…
xgreenx Jun 3, 2024
94fc5b3
Merge branch 'master' into feature/fixed-p2p-reconnection-issue
xgreenx Jun 3, 2024
ac7f88c
Merge branch 'feature/fixed-p2p-reconnection-issue' into feature/spee…
xgreenx Jun 3, 2024
664738c
Merge branch 'master' into feature/fixed-p2p-reconnection-issue
xgreenx Jun 4, 2024
acd9493
Merge branch 'feature/fixed-p2p-reconnection-issue' into feature/spee…
xgreenx Jun 4, 2024
39ecd21
Merge branch 'master' into feature/speed-up-synchronization
xgreenx Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [Unreleased]

### Changed
- [#1916](https://github.com/FuelLabs/fuel-core/pull/1916): Speed up synchronisation of the blocks for the `fuel-core-sync` service.

#### Breaking

Expand All @@ -19,6 +20,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [#1913](https://github.com/FuelLabs/fuel-core/pull/1913): Removed dead code from the project.

### Fixed
- [#1915](https://github.com/FuelLabs/fuel-core/pull/1915): Fixed reconnection issue in the dev cluster with AWS cluster.
- [#1914](https://github.com/FuelLabs/fuel-core/pull/1914): Fixed halting of the node during synchronization in PoA service.

## [Version 0.27.0]
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ pub struct P2PArgs {
#[clap(long = "request-timeout", default_value = "20", env)]
pub request_timeout: u64,

/// Choose max concurrent streams for RequestResponse protocol
#[clap(long = "request-max-concurrent-streams", default_value = "256", env)]
pub max_concurrent_streams: usize,

/// Choose how long RequestResponse protocol connections will live if idle
#[clap(long = "connection-keep-alive", default_value = "20", env)]
pub connection_keep_alive: u64,
Expand Down Expand Up @@ -308,6 +312,7 @@ impl P2PArgs {
gossipsub_config,
heartbeat_config,
set_request_timeout: Duration::from_secs(self.request_timeout),
max_concurrent_streams: self.max_concurrent_streams,
set_connection_keep_alive: Duration::from_secs(self.connection_keep_alive),
heartbeat_check_interval: Duration::from_secs(self.heartbeat_check_interval),
heartbeat_max_avg_interval: Duration::from_secs(
Expand Down
7 changes: 3 additions & 4 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,9 @@ impl FuelBehaviour {
let req_res_protocol =
core::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full));

let req_res_config = request_response::Config::default();
req_res_config
.clone()
.with_request_timeout(p2p_config.set_request_timeout);
let req_res_config = request_response::Config::default()
.with_request_timeout(p2p_config.set_request_timeout)
.with_max_concurrent_streams(p2p_config.max_concurrent_streams);

let request_response = request_response::Behaviour::with_codec(
codec,
Expand Down
89 changes: 17 additions & 72 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,15 @@ use crate::{
use fuel_core_types::blockchain::consensus::Genesis;

use libp2p::{
core::{
muxing::StreamMuxerBox,
transport::Boxed,
},
gossipsub,
identity::{
secp256k1,
Keypair,
},
noise,
tcp::{
self,
tokio,
},
yamux,
Multiaddr,
PeerId,
Transport,
};
use libp2p_mplex::MplexConfig;
use std::{
collections::HashSet,
net::{
Expand All @@ -44,12 +33,10 @@ use self::{
connection_tracker::ConnectionTracker,
fuel_authenticated::FuelAuthenticated,
fuel_upgrade::Checksum,
guarded_node::GuardedNode,
};
mod connection_tracker;
mod fuel_authenticated;
pub(crate) mod fuel_upgrade;
mod guarded_node;

const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20);

Expand All @@ -62,13 +49,9 @@ pub const MAX_RESPONSE_SIZE: usize = 18 * 1024 * 1024;
/// Maximum number of headers per request.
pub const MAX_HEADERS_PER_REQUEST: u32 = 100;

/// Adds a timeout to the setup and protocol upgrade process for all
/// inbound and outbound connections established through the transport.
const TRANSPORT_TIMEOUT: Duration = Duration::from_secs(20);

#[derive(Clone, Debug)]
pub struct Config<State = Initialized> {
/// The keypair used for for handshake during communication with other p2p nodes.
/// The keypair used for handshake during communication with other p2p nodes.
pub keypair: Keypair,

/// Name of the Network
Expand Down Expand Up @@ -126,6 +109,8 @@ pub struct Config<State = Initialized> {
// RequestResponse related fields
/// Sets the timeout for inbound and outbound requests.
pub set_request_timeout: Duration,
/// Sets the maximum number of concurrent streams for a connection.
pub max_concurrent_streams: usize,
/// Sets the keep-alive timeout of idle connections.
pub set_connection_keep_alive: Duration,

Expand Down Expand Up @@ -180,6 +165,7 @@ impl Config<NotInitialized> {
gossipsub_config: self.gossipsub_config,
heartbeat_config: self.heartbeat_config,
set_request_timeout: self.set_request_timeout,
max_concurrent_streams: self.max_concurrent_streams,
set_connection_keep_alive: self.set_connection_keep_alive,
heartbeat_check_interval: self.heartbeat_check_interval,
heartbeat_max_avg_interval: self.heartbeat_max_time_since_last,
Expand Down Expand Up @@ -226,6 +212,7 @@ impl Config<NotInitialized> {
gossipsub_config: default_gossipsub_config(),
heartbeat_config: heartbeat::Config::default(),
set_request_timeout: REQ_RES_TIMEOUT,
max_concurrent_streams: 256,
set_connection_keep_alive: REQ_RES_TIMEOUT,
heartbeat_check_interval: Duration::from_secs(10),
heartbeat_max_avg_interval: Duration::from_secs(20),
Expand Down Expand Up @@ -254,71 +241,29 @@ impl Config<Initialized> {
pub(crate) fn build_transport_function(
p2p_config: &Config,
) -> (
impl FnOnce(&Keypair) -> Boxed<(PeerId, StreamMuxerBox)> + '_,
impl FnOnce(&Keypair) -> Result<FuelAuthenticated<ConnectionTracker>, ()> + '_,
Arc<RwLock<ConnectionState>>,
) {
let connection_state = ConnectionState::new();
let kept_connection_state = connection_state.clone();
let transport_function = move |keypair: &Keypair| {
let transport = {
let generate_tcp_transport = || {
tokio::Transport::new(tcp::Config::new().port_reuse(true).nodelay(true))
};

let tcp = generate_tcp_transport();

let ws_tcp = libp2p::websocket::WsConfig::new(generate_tcp_transport())
.or_transport(tcp);

libp2p::dns::tokio::Transport::system(ws_tcp).unwrap()
}
.upgrade(libp2p::core::upgrade::Version::V1Lazy);

let noise_authenticated =
noise::Config::new(keypair).expect("Noise key generation failed");

let multiplex_config = {
let mplex_config = MplexConfig::default();

let mut yamux_config = yamux::Config::default();
// TODO: remove deprecated method call https://github.com/FuelLabs/fuel-core/issues/1592
#[allow(deprecated)]
yamux_config.set_max_buffer_size(MAX_RESPONSE_SIZE);
libp2p::core::upgrade::SelectUpgrade::new(yamux_config, mplex_config)
let connection_state = if p2p_config.reserved_nodes_only_mode {
None
} else {
Some(connection_state)
};

if p2p_config.reserved_nodes_only_mode {
let guarded_node = GuardedNode::new(&p2p_config.reserved_nodes);
let connection_tracker =
ConnectionTracker::new(&p2p_config.reserved_nodes, connection_state);

let fuel_authenticated = FuelAuthenticated::new(
noise_authenticated,
guarded_node,
p2p_config.checksum,
);

transport
.authenticate(fuel_authenticated)
.multiplex(multiplex_config)
.timeout(TRANSPORT_TIMEOUT)
.boxed()
} else {
let connection_tracker = ConnectionTracker::new(
&p2p_config.reserved_nodes,
connection_state.clone(),
);

let fuel_authenticated = FuelAuthenticated::new(
noise_authenticated,
connection_tracker,
p2p_config.checksum,
);

transport
.authenticate(fuel_authenticated)
.multiplex(multiplex_config)
.timeout(TRANSPORT_TIMEOUT)
.boxed()
}
Ok(FuelAuthenticated::new(
noise_authenticated,
connection_tracker,
p2p_config.checksum,
))
};

(transport_function, kept_connection_state)
Expand Down
10 changes: 6 additions & 4 deletions crates/services/p2p/src/config/connection_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use std::{
#[derive(Debug, Clone)]
pub(crate) struct ConnectionTracker {
reserved_nodes: HashSet<PeerId>,
connection_state: Arc<RwLock<ConnectionState>>,
connection_state: Option<Arc<RwLock<ConnectionState>>>,
}

impl ConnectionTracker {
pub(crate) fn new(
reserved_nodes: &[Multiaddr],
connection_state: Arc<RwLock<ConnectionState>>,
connection_state: Option<Arc<RwLock<ConnectionState>>>,
) -> Self {
Self {
reserved_nodes: peer_ids_set_from(reserved_nodes),
Expand All @@ -41,8 +41,10 @@ impl Approver for ConnectionTracker {
return true
}

if let Ok(connection_state) = self.connection_state.read() {
return connection_state.available_slot()
if let Some(connection_state) = &self.connection_state {
if let Ok(connection_state) = connection_state.read() {
return connection_state.available_slot()
}
}

false
Expand Down
44 changes: 21 additions & 23 deletions crates/services/p2p/src/config/fuel_authenticated.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::config::fuel_upgrade::Checksum;
use futures::{
future,
AsyncRead,
AsyncWrite,
Future,
TryFutureExt,
};
use libp2p::{
self,
Expand Down Expand Up @@ -71,17 +69,16 @@ where
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future {
Box::pin(
self.noise_authenticated
.upgrade_inbound(socket, "")
.and_then(move |(remote_peer_id, io)| {
if self.approver.allow_peer(&remote_peer_id) {
future::ok((remote_peer_id, io))
} else {
future::err(noise::Error::AuthenticationFailed)
}
}),
)
Box::pin(async move {
let (remote_peer_id, io) =
self.noise_authenticated.upgrade_inbound(socket, "").await?;

if self.approver.allow_peer(&remote_peer_id) {
Ok((remote_peer_id, io))
} else {
Err(noise::Error::AuthenticationFailed)
}
})
}
}

Expand All @@ -95,16 +92,17 @@ where
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future {
Box::pin(
self.noise_authenticated
Box::pin(async move {
let (remote_peer_id, io) = self
.noise_authenticated
.upgrade_outbound(socket, "")
.and_then(move |(remote_peer_id, io)| {
if self.approver.allow_peer(&remote_peer_id) {
future::ok((remote_peer_id, io))
} else {
future::err(noise::Error::AuthenticationFailed)
}
}),
)
.await?;

if self.approver.allow_peer(&remote_peer_id) {
Ok((remote_peer_id, io))
} else {
Err(noise::Error::AuthenticationFailed)
}
})
}
}
31 changes: 0 additions & 31 deletions crates/services/p2p/src/config/guarded_node.rs

This file was deleted.

Loading
Loading