Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace usage of futures-timer to avoid extra helper threads #982

Merged
merged 1 commit into from
Feb 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/services/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ fuel-core-types = { path = "../../types", features = [
"serde",
], version = "0.16.1" }
futures = "0.3"
futures-timer = "3.0"
ip_network = "0.4"
libp2p = { version = "=0.50.0", default-features = false, features = [
"dns",
Expand Down
7 changes: 4 additions & 3 deletions crates/services/p2p/src/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use self::mdns::MdnsWrapper;
use futures::FutureExt;
use futures_timer::Delay;
use ip_network::IpNetwork;
use libp2p::{
core::connection::ConnectionId,
Expand Down Expand Up @@ -33,6 +32,7 @@ use std::{
HashSet,
VecDeque,
},
pin::Pin,
task::{
Context,
Poll,
Expand Down Expand Up @@ -81,7 +81,7 @@ pub struct DiscoveryBehaviour {

/// If enabled, the Stream that will fire after the delay expires,
/// starting new random walk
next_kad_random_walk: Option<Delay>,
next_kad_random_walk: Option<Pin<Box<tokio::time::Sleep>>>,

/// The Duration for the next random walk, after the current one ends
duration_to_next_kad: Duration,
Expand Down Expand Up @@ -175,7 +175,8 @@ impl NetworkBehaviour for DiscoveryBehaviour {
self.kademlia.get_closest_peers(random_peer_id);
}

*next_kad_random_query = Delay::new(self.duration_to_next_kad);
*next_kad_random_query =
Box::pin(tokio::time::sleep(self.duration_to_next_kad));
// duration to next random walk should either be exponentially bigger than the previous
// or at max 60 seconds
self.duration_to_next_kad =
Expand Down
5 changes: 3 additions & 2 deletions crates/services/p2p/src/discovery/discovery_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::discovery::{
mdns::MdnsWrapper,
DiscoveryBehaviour,
};
use futures_timer::Delay;
use libp2p::{
kad::{
store::MemoryStore,
Expand Down Expand Up @@ -161,7 +160,9 @@ impl DiscoveryConfig {
}

let next_kad_random_walk = {
let random_walk = self.random_walk.map(Delay::new);
let random_walk = self
.random_walk
.map(|duration| Box::pin(tokio::time::sleep(duration)));

// no need to preferm random walk if we don't want the node to connect to non-whitelisted peers
if !reserved_nodes_only_mode {
Expand Down
14 changes: 9 additions & 5 deletions crates/services/p2p/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use futures::{
AsyncWriteExt,
FutureExt,
};
use futures_timer::Delay;
use libp2p_core::upgrade::ReadyUpgrade;
use libp2p_swarm::{
handler::{
Expand All @@ -25,9 +24,14 @@ use libp2p_swarm::{
use std::{
fmt::Display,
num::NonZeroU32,
pin::Pin,
task::Poll,
time::Duration,
};
use tokio::time::{
sleep,
Sleep,
};
use tracing::debug;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -95,7 +99,7 @@ pub struct HeartbeatHandler {
config: HeartbeatConfig,
inbound: Option<InboundData>,
outbound: Option<OutboundState>,
timer: Delay,
timer: Pin<Box<Sleep>>,
failure_count: u32,
}

Expand All @@ -105,7 +109,7 @@ impl HeartbeatHandler {
config,
inbound: None,
outbound: None,
timer: Delay::new(Duration::new(0, 0)),
timer: Box::pin(sleep(Duration::new(0, 0))),
failure_count: 0,
}
}
Expand Down Expand Up @@ -139,7 +143,7 @@ impl ConnectionHandler for HeartbeatHandler {
stream,
}) => {
// start new send timeout
self.timer.reset(self.config.send_timeout);
self.timer = Box::pin(sleep(self.config.send_timeout));
// send latest `BlockHeight`
self.outbound = Some(OutboundState::SendingBlockHeight(
send_block_height(stream, block_height).boxed(),
Expand Down Expand Up @@ -220,7 +224,7 @@ impl ConnectionHandler for HeartbeatHandler {
// reset failure count
self.failure_count = 0;
// start new idle timeout until next request & send
self.timer.reset(self.config.idle_timeout);
self.timer = Box::pin(sleep(self.config.idle_timeout));
self.outbound = Some(OutboundState::Idle(stream));
}
Poll::Ready(Err(_)) => {
Expand Down