Skip to content

Commit

Permalink
Merge pull request paritytech#1459 from subspace/increase-backoff-time
Browse files Browse the repository at this point in the history
Remove DSN online status and increase get-piece backoff time.
  • Loading branch information
shamil-gadelshin authored May 16, 2023
2 parents a2d64d8 + e7eb251 commit da96b48
Show file tree
Hide file tree
Showing 9 changed files with 3 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use subspace_farmer::utils::run_future_in_dedicated_thread;
use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
use subspace_farmer_components::piece_caching::PieceMemoryCache;
use subspace_networking::libp2p::identity::{ed25519, Keypair};
use subspace_networking::utils::online_status_informer;
use subspace_networking::utils::piece_announcement::announce_single_piece_index_with_backoff;
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
Expand Down Expand Up @@ -388,7 +387,6 @@ where
"farmer-networking".to_string(),
)?;
let mut networking_fut = Box::pin(networking_fut).fuse();
let status_informer_fut = online_status_informer(&node);

futures::select!(
// Signal future
Expand All @@ -403,11 +401,6 @@ where
_ = networking_fut => {
info!("Node runner exited.")
},

// Status informer future
_ = status_informer_fut.fuse() => {
info!("DSN online status observer exited.");
},
);

anyhow::Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use anyhow::anyhow;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
use either::Either;
use futures::{select, FutureExt};
use libp2p::identity::ed25519::Keypair;
use libp2p::{identity, Multiaddr, PeerId};
use serde::{Deserialize, Serialize};
Expand All @@ -15,7 +14,6 @@ use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::utils::online_status_informer;
use subspace_networking::{
peer_id, BootstrappedNetworkingParameters, Config, NetworkingParametersManager,
ParityDbProviderStorage, VoidProviderStorage,
Expand Down Expand Up @@ -199,19 +197,8 @@ async fn main() -> anyhow::Result<()> {
}))
.detach();

let status_informer_fut = online_status_informer(&node);
let networking_fut = node_runner.run();

info!("Subspace Bootstrap Node started");
select!(
// Status informer future
_ = status_informer_fut.fuse() => {
info!("DSN online status observer exited.");
},

// Node runner future
_ = networking_fut.fuse() => {},
);
node_runner.run().await;
}
Command::GenerateKeypair { json } => {
let output = KeypairOutput::new(Keypair::generate());
Expand Down
5 changes: 0 additions & 5 deletions crates/subspace-networking/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use std::time::{Duration, Instant};
use std::{fmt, io, iter};
use subspace_core_primitives::{crypto, Piece};
use thiserror::Error;
use tokio::sync::watch;
use tracing::{debug, error, info};

const DEFAULT_NETWORK_PROTOCOL_VERSION: &str = "dev";
Expand Down Expand Up @@ -456,15 +455,12 @@ where

let kademlia_tasks_semaphore = ResizableSemaphore::new(KADEMLIA_BASE_CONCURRENT_TASKS);
let regular_tasks_semaphore = ResizableSemaphore::new(REGULAR_BASE_CONCURRENT_TASKS);
// DSN is not connected from the start.
let (online_status_observer_tx, online_status_observer_rx) = watch::channel(false);

let shared = Arc::new(Shared::new(
local_peer_id,
command_sender,
kademlia_tasks_semaphore,
regular_tasks_semaphore,
online_status_observer_rx,
));
let shared_weak = Arc::downgrade(&shared);

Expand All @@ -480,7 +476,6 @@ where
target_connections,
temporary_bans,
metrics,
online_status_observer_tx,
protocol_version,
});

Expand Down
5 changes: 0 additions & 5 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use thiserror::Error;
use tokio::sync::watch;
use tokio::time::sleep;
use tracing::{error, trace};

Expand Down Expand Up @@ -283,10 +282,6 @@ impl Node {
self.shared.id
}

pub fn online_status_observer(&self) -> &watch::Receiver<bool> {
&self.shared.online_status_observer_rx
}

pub async fn get_value(
&self,
key: Multihash,
Expand Down
22 changes: 0 additions & 22 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::Sleep;
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -108,8 +107,6 @@ where
metrics: Option<Metrics>,
/// Mapping from specific peer to number of established connections
established_connections: HashMap<(PeerId, ConnectedPoint), usize>,
/// DSN connection observer. Turns on/off DSN operations like piece retrieval.
online_status_observer_tx: watch::Sender<bool>,
/// Defines protocol version for the network peers. Affects network partition.
protocol_version: String,
}
Expand All @@ -129,7 +126,6 @@ where
pub(crate) target_connections: u32,
pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
pub(crate) metrics: Option<Metrics>,
pub(crate) online_status_observer_tx: watch::Sender<bool>,
pub(crate) protocol_version: String,
}

Expand All @@ -149,7 +145,6 @@ where
target_connections,
temporary_bans,
metrics,
online_status_observer_tx,
protocol_version,
}: NodeRunnerConfig<ProviderStorage>,
) -> Self {
Expand All @@ -172,7 +167,6 @@ where
temporary_bans,
metrics,
established_connections: HashMap::new(),
online_status_observer_tx,
protocol_version,
}
}
Expand Down Expand Up @@ -218,18 +212,6 @@ where
}
}

// Handle DSN online status signaling
fn signal_online_status(&mut self) {
let current_online_status = self.swarm.connected_peers().next().is_some();
let previous_online_status = *self.online_status_observer_tx.borrow();

if previous_online_status != current_online_status {
if let Err(err) = self.online_status_observer_tx.send(current_online_status) {
error!("DSN connection observer channel failed: {err}")
}
}
}

async fn handle_peer_dialing(&mut self) {
let local_peer_id = *self.swarm.local_peer_id();
let connected_peers = self.swarm.connected_peers().cloned().collect::<Vec<_>>();
Expand Down Expand Up @@ -377,8 +359,6 @@ where
num_established,
..
} => {
self.signal_online_status();

// Save known addresses that were successfully dialed.
if let ConnectedPoint::Dialer { address, .. } = &endpoint {
// filter non-global addresses when non-globals addresses are disabled
Expand Down Expand Up @@ -438,8 +418,6 @@ where
num_established,
..
} => {
self.signal_online_status();

let shared = match self.shared_weak.upgrade() {
Some(shared) => shared,
None => {
Expand Down
4 changes: 0 additions & 4 deletions crates/subspace-networking/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use libp2p::{Multiaddr, PeerId};
use parking_lot::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tokio::sync::watch;

#[derive(Debug)]
pub(crate) struct CreatedSubscription {
Expand Down Expand Up @@ -110,7 +109,6 @@ pub(crate) struct Shared {
pub(crate) command_sender: mpsc::Sender<Command>,
pub(crate) kademlia_tasks_semaphore: ResizableSemaphore,
pub(crate) regular_tasks_semaphore: ResizableSemaphore,
pub(crate) online_status_observer_rx: watch::Receiver<bool>,
}

impl Shared {
Expand All @@ -119,7 +117,6 @@ impl Shared {
command_sender: mpsc::Sender<Command>,
kademlia_tasks_semaphore: ResizableSemaphore,
regular_tasks_semaphore: ResizableSemaphore,
online_status_observer_rx: watch::Receiver<bool>,
) -> Self {
Self {
handlers: Handlers::default(),
Expand All @@ -129,7 +126,6 @@ impl Shared {
command_sender,
kademlia_tasks_semaphore,
regular_tasks_semaphore,
online_status_observer_rx,
}
}
}
24 changes: 1 addition & 23 deletions crates/subspace-networking/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ pub(crate) mod prometheus;
mod tests;
pub(crate) mod unique_record_binary_heap;

use crate::Node;
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
use parking_lot::Mutex;
use std::future::Future;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::Notify;
use tracing::{debug, warn};
use tracing::warn;

/// This test is successful only for global IP addresses and DNS names.
pub(crate) fn is_global_address_or_dns(addr: &Multiaddr) -> bool {
Expand Down Expand Up @@ -287,23 +285,3 @@ impl Drop for ResizableSemaphorePermit {
}
}
}

/// Prints DSN online status changes.
pub fn online_status_informer(node: &Node) -> impl Future<Output = ()> {
let mut online_status_observer = node.online_status_observer().clone();
async move {
loop {
if online_status_observer.changed().await.is_err() {
return; // app is closing
}

let online_status = *online_status_observer.borrow();

if online_status {
debug!("DSN connection established.");
} else {
debug!("DSN connection lost.");
}
}
}
}
23 changes: 1 addition & 22 deletions crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{debug, error, trace, warn};
/// Defines initial duration between get_piece calls.
const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(3);
/// Defines max duration between get_piece calls.
const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(10);
const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40);

#[async_trait]
pub trait PieceValidator: Sync + Send {
Expand Down Expand Up @@ -135,27 +135,6 @@ where
retry(backoff, || async {
let current_attempt = retries.fetch_add(1, Ordering::Relaxed);

// Wait until we connect to DSN.
let mut online_status_observer = self.node.online_status_observer().clone();
// We have a loop because we can be notified about the offline status multiple times
// and it will "emit the changed event".
loop {
let online = *online_status_observer.borrow();

if !online {
debug!(%piece_index, current_attempt, "Couldn't get a piece from DSN. No DSN connection...");

// Wait until we get the updates.
if let Err(err) = online_status_observer.changed().await {
return Err(backoff::Error::permanent(
format!("DSN status observer closed the channel's sender: {err}", ).into(),
))
}
} else {
break;
}
}

if let Some(piece) = self.get_piece_from_storage(piece_index).await {
trace!(%piece_index, current_attempt, "Got piece");
return Ok(Some(piece));
Expand Down
13 changes: 0 additions & 13 deletions crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ use subspace_fraud_proof::domain_extrinsics_builder::SystemDomainExtrinsicsBuild
use subspace_fraud_proof::verifier_api::VerifierClient;
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::libp2p::Multiaddr;
use subspace_networking::utils::online_status_informer;
use subspace_networking::{peer_id, Node};
use subspace_proof_of_space::Table;
use subspace_runtime_primitives::opaque::Block;
Expand Down Expand Up @@ -635,18 +634,6 @@ where
}))
.detach();

let status_informer_fut = online_status_informer(&node);
task_manager.spawn_handle().spawn(
"status-observer",
Some("subspace-networking"),
Box::pin(
async move {
status_informer_fut.await;
}
.in_current_span(),
),
);

task_manager.spawn_essential_handle().spawn_essential(
"node-runner",
Some("subspace-networking"),
Expand Down

0 comments on commit da96b48

Please sign in to comment.