diff --git a/crates/nostr-sdk/src/relay/mod.rs b/crates/nostr-sdk/src/relay/mod.rs index 0da2bf36c..9d82815f0 100644 --- a/crates/nostr-sdk/src/relay/mod.rs +++ b/crates/nostr-sdk/src/relay/mod.rs @@ -3,16 +3,17 @@ //! Relay +use std::collections::VecDeque; use std::fmt; #[cfg(not(target_arch = "wasm32"))] use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; #[cfg(feature = "nip11")] use nostr::nips::nip11::RelayInformationDocument; -use nostr::{ClientMessage, Event, Filter, RelayMessage, SubscriptionId, Timestamp, Url}; +use nostr::{ClientMessage, Event, Filter, Kind, RelayMessage, SubscriptionId, Timestamp, Url}; use nostr_sdk_net::futures_util::{Future, SinkExt, StreamExt}; use nostr_sdk_net::{self as net, WsMessage}; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -91,7 +92,6 @@ impl fmt::Display for RelayStatus { pub enum RelayEvent { /// Send [`ClientMessage`] SendMsg(Box), - // Ping, /// Close Close, /// Stop @@ -155,6 +155,7 @@ pub struct RelayConnectionStats { attempts: Arc, success: Arc, connected_at: Arc, + latencies: Arc>>, } impl Default for RelayConnectionStats { @@ -170,6 +171,7 @@ impl RelayConnectionStats { attempts: Arc::new(AtomicUsize::new(0)), success: Arc::new(AtomicUsize::new(0)), connected_at: Arc::new(AtomicU64::new(0)), + latencies: Arc::new(Mutex::new(VecDeque::new())), } } @@ -188,6 +190,19 @@ impl RelayConnectionStats { Timestamp::from(self.connected_at.load(Ordering::SeqCst)) } + /// Calculate latency + pub async fn latency(&self) -> Option { + let latencies = self.latencies.lock().await; + let sum: Duration = latencies.iter().sum(); + sum.checked_div(latencies.len() as u32) + } + + /// Calculate latency + #[cfg(feature = "blocking")] + pub fn latency_blocking(&self) -> Option { + RUNTIME.block_on(async { self.latency().await }) + } + pub(crate) fn new_attempt(&self) { self.attempts.fetch_add(1, Ordering::SeqCst); } @@ -200,6 +215,14 @@ impl RelayConnectionStats { Some(Timestamp::now().as_u64()) }); } + + pub(crate) async fn save_latency(&self, latency: Duration) { + let mut latencies = self.latencies.lock().await; + if latencies.len() >= 5 { + latencies.pop_back(); + } + latencies.push_front(latency) + } } /// Relay instance's actual subscription with its unique id @@ -506,6 +529,28 @@ impl Relay { self.stats.new_success(); + let relay = self.clone(); + let ping_abort_handle = thread::abortable(async move { + log::debug!("Relay Ping Thread Started"); + + loop { + let filter = Filter::new().kind(Kind::Custom(1000)).limit(1); + let now = Instant::now(); + match relay.get_events_of(vec![filter], None).await { + Ok(_) => { + relay.stats.save_latency(now.elapsed()).await; + } + Err(e) => { + log::error!("Impossible to ping relay {}: {e}", relay.url); + break; + } + }; + thread::sleep(Duration::from_secs(60)).await; + } + + log::debug!("Exited from Ping Thread of {}", relay.url); + }); + let relay = self.clone(); thread::spawn(async move { log::debug!("Relay Event Thread Started"); @@ -569,6 +614,8 @@ impl Relay { } } log::debug!("Exited from Relay Event Thread"); + + ping_abort_handle.abort(); }); let relay = self.clone();