Skip to content

Commit

Permalink
sdk: add latency to RelayConnectionStats
Browse files Browse the repository at this point in the history
  • Loading branch information
yukibtc committed Jun 23, 2023
1 parent c4da6fd commit 67aeb50
Showing 1 changed file with 50 additions and 3 deletions.
53 changes: 50 additions & 3 deletions crates/nostr-sdk/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@

//! Relay

use std::collections::VecDeque;
use std::fmt;
#[cfg(not(target_arch = "wasm32"))]
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

#[cfg(feature = "nip11")]
use nostr::nips::nip11::RelayInformationDocument;
use nostr::{ClientMessage, Event, Filter, RelayMessage, SubscriptionId, Timestamp, Url};
use nostr::{ClientMessage, Event, Filter, Kind, RelayMessage, SubscriptionId, Timestamp, Url};
use nostr_sdk_net::futures_util::{Future, SinkExt, StreamExt};
use nostr_sdk_net::{self as net, WsMessage};
use tokio::sync::mpsc::{self, Receiver, Sender};
Expand Down Expand Up @@ -91,7 +92,6 @@ impl fmt::Display for RelayStatus {
pub enum RelayEvent {
/// Send [`ClientMessage`]
SendMsg(Box<ClientMessage>),
// Ping,
/// Close
Close,
/// Stop
Expand Down Expand Up @@ -155,6 +155,7 @@ pub struct RelayConnectionStats {
attempts: Arc<AtomicUsize>,
success: Arc<AtomicUsize>,
connected_at: Arc<AtomicU64>,
latencies: Arc<Mutex<VecDeque<Duration>>>,
}

impl Default for RelayConnectionStats {
Expand All @@ -170,6 +171,7 @@ impl RelayConnectionStats {
attempts: Arc::new(AtomicUsize::new(0)),
success: Arc::new(AtomicUsize::new(0)),
connected_at: Arc::new(AtomicU64::new(0)),
latencies: Arc::new(Mutex::new(VecDeque::new())),
}
}

Expand All @@ -188,6 +190,19 @@ impl RelayConnectionStats {
Timestamp::from(self.connected_at.load(Ordering::SeqCst))
}

/// Calculate latency
pub async fn latency(&self) -> Option<Duration> {
let latencies = self.latencies.lock().await;
let sum: Duration = latencies.iter().sum();
sum.checked_div(latencies.len() as u32)
}

/// Calculate latency
#[cfg(feature = "blocking")]
pub fn latency_blocking(&self) -> Option<Duration> {
RUNTIME.block_on(async { self.latency().await })
}

pub(crate) fn new_attempt(&self) {
self.attempts.fetch_add(1, Ordering::SeqCst);
}
Expand All @@ -200,6 +215,14 @@ impl RelayConnectionStats {
Some(Timestamp::now().as_u64())
});
}

pub(crate) async fn save_latency(&self, latency: Duration) {
let mut latencies = self.latencies.lock().await;
if latencies.len() >= 10 {
latencies.pop_back();
}
latencies.push_front(latency)
}
}

/// Relay instance's actual subscription with its unique id
Expand Down Expand Up @@ -506,6 +529,28 @@ impl Relay {

self.stats.new_success();

let relay = self.clone();
let ping_abort_handle = thread::abortable(async move {
log::debug!("Relay Ping Thread Started");

loop {
let filter = Filter::new().kind(Kind::Custom(1000)).limit(1);
let now = Instant::now();
match relay.get_events_of(vec![filter], None).await {
Ok(_) => {
relay.stats.save_latency(now.elapsed()).await;
}
Err(e) => {
log::error!("Impossible to ping relay {}: {e}", relay.url);
break;
}
};
tokio::time::sleep(Duration::from_secs(60)).await;
}

log::debug!("Exited from Ping Thread of {}", relay.url);
});

let relay = self.clone();
thread::spawn(async move {
log::debug!("Relay Event Thread Started");
Expand Down Expand Up @@ -569,6 +614,8 @@ impl Relay {
}
}
log::debug!("Exited from Relay Event Thread");

ping_abort_handle.abort();
});

let relay = self.clone();
Expand Down

0 comments on commit 67aeb50

Please sign in to comment.