diff --git a/bindings/nostr-sdk-ffi/src/client/mod.rs b/bindings/nostr-sdk-ffi/src/client/mod.rs index b76ca0b3b..5e1d9b07c 100644 --- a/bindings/nostr-sdk-ffi/src/client/mod.rs +++ b/bindings/nostr-sdk-ffi/src/client/mod.rs @@ -164,6 +164,29 @@ impl Client { // TODO: add get_events_of_with_opts + /// Get events of filters from specific relays + /// + /// Get events both from **local database** and **relays** + /// + /// If no relay is specified, will be queried only the database. + pub fn get_events_from( + &self, + urls: Vec, + filters: Vec>, + timeout: Option, + ) -> Result>> { + let filters = filters + .into_iter() + .map(|f| f.as_ref().deref().clone()) + .collect(); + Ok(self + .inner + .get_events_from(urls, filters, timeout)? + .into_iter() + .map(|e| Arc::new(e.into())) + .collect()) + } + pub fn req_events_of(&self, filters: Vec>, timeout: Option) { let filters = filters .into_iter() @@ -178,8 +201,8 @@ impl Client { Ok(self.inner.send_msg(msg.into())?) } - pub fn send_msg_to(&self, url: String, msg: ClientMessage) -> Result<()> { - Ok(self.inner.send_msg_to(url, msg.into())?) + pub fn send_msg_to(&self, urls: Vec, msg: ClientMessage) -> Result<()> { + Ok(self.inner.send_msg_to(urls, msg.into())?) } pub fn send_event(&self, event: Arc) -> Result> { @@ -190,10 +213,10 @@ impl Client { )) } - pub fn send_event_to(&self, url: String, event: Arc) -> Result> { + pub fn send_event_to(&self, urls: Vec, event: Arc) -> Result> { Ok(Arc::new( self.inner - .send_event_to(url, event.as_ref().deref().clone())? + .send_event_to(urls, event.as_ref().deref().clone())? .into(), )) } @@ -223,12 +246,12 @@ impl Client { /// Rise an error if the [`ClientSigner`] is not set. pub fn send_event_builder_to( &self, - url: String, + urls: Vec, builder: Arc, ) -> Result> { Ok(Arc::new( self.inner - .send_event_builder_to(url, builder.as_ref().deref().clone())? + .send_event_builder_to(urls, builder.as_ref().deref().clone())? .into(), )) } diff --git a/bindings/nostr-sdk-ffi/src/relay/mod.rs b/bindings/nostr-sdk-ffi/src/relay/mod.rs index 1868cbc8e..f134e883d 100644 --- a/bindings/nostr-sdk-ffi/src/relay/mod.rs +++ b/bindings/nostr-sdk-ffi/src/relay/mod.rs @@ -13,6 +13,7 @@ use uniffi::{Enum, Object}; pub mod options; +use self::options::RelaySendOptions; use crate::error::Result; #[derive(Object)] @@ -182,11 +183,11 @@ impl Relay { block_on(async move { Ok(self.inner.terminate().await?) }) } - pub fn send_msg(&self, msg: ClientMessage, wait: Option) -> Result<()> { - block_on(async move { Ok(self.inner.send_msg(msg.into(), wait).await?) }) + pub fn send_msg(&self, msg: ClientMessage, opts: Arc) -> Result<()> { + block_on(async move { Ok(self.inner.send_msg(msg.into(), **opts).await?) }) } - pub fn subscribe(&self, filters: Vec>, wait: Option) -> Result<()> { + pub fn subscribe(&self, filters: Vec>, opts: Arc) -> Result<()> { block_on(async move { Ok(self .inner @@ -195,14 +196,14 @@ impl Relay { .into_iter() .map(|f| f.as_ref().deref().clone()) .collect(), - wait, + **opts, ) .await?) }) } - pub fn unsubscribe(&self, wait: Option) -> Result<()> { - block_on(async move { Ok(self.inner.unsubscribe(wait).await?) }) + pub fn unsubscribe(&self, opts: Arc) -> Result<()> { + block_on(async move { Ok(self.inner.unsubscribe(**opts).await?) }) } pub fn get_events_of( diff --git a/bindings/nostr-sdk-ffi/src/relay/options.rs b/bindings/nostr-sdk-ffi/src/relay/options.rs index 275931082..df41de42e 100644 --- a/bindings/nostr-sdk-ffi/src/relay/options.rs +++ b/bindings/nostr-sdk-ffi/src/relay/options.rs @@ -9,6 +9,53 @@ use std::time::Duration; use nostr_ffi::helper::unwrap_or_clone_arc; use uniffi::{Enum, Object}; +#[derive(Clone, Object)] +pub struct RelaySendOptions { + inner: nostr_sdk::RelaySendOptions, +} + +impl Deref for RelaySendOptions { + type Target = nostr_sdk::RelaySendOptions; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[uniffi::export] +impl RelaySendOptions { + /// New default `RelaySendOptions` + #[uniffi::constructor] + pub fn new() -> Self { + Self { + inner: nostr_sdk::RelaySendOptions::default(), + } + } + + /// Skip wait for disconnected relay (default: true) + pub fn skip_disconnected(self: Arc, value: bool) -> Self { + let mut builder = unwrap_or_clone_arc(self); + builder.inner = builder.inner.skip_disconnected(value); + builder + } + + /// Skip wait for confirmation that message is sent (default: false) + pub fn skip_send_confirmation(self: Arc, value: bool) -> Self { + let mut builder = unwrap_or_clone_arc(self); + builder.inner = builder.inner.skip_send_confirmation(value); + builder + } + + /// Timeout for sending event (default: 10 secs) + /// + /// If `None`, the default timeout will be used + pub fn timeout(self: Arc, timeout: Option) -> Self { + let mut builder = unwrap_or_clone_arc(self); + builder.inner = builder.inner.timeout(timeout); + builder + } +} + #[derive(Enum)] pub enum NegentropyDirection { Up, diff --git a/bindings/nostr-sdk-js/examples/client.js b/bindings/nostr-sdk-js/examples/client.js index 23fe89ba5..c31f41c1c 100644 --- a/bindings/nostr-sdk-js/examples/client.js +++ b/bindings/nostr-sdk-js/examples/client.js @@ -46,7 +46,7 @@ async function main() { await client.sendEvent(event); // Send custom event to a specific previously added relay - // await client.sendEventTo("wss://relay.damus.io", event); + // await client.sendEventTo(["wss://relay.damus.io"], event); let builder = new EventBuilder(1111, "My custom event signer with the ClientSigner", []); await client.sendEventBuilder(builder); diff --git a/bindings/nostr-sdk-js/src/client/mod.rs b/bindings/nostr-sdk-js/src/client/mod.rs index 0bb85503a..ab0837ca0 100644 --- a/bindings/nostr-sdk-js/src/client/mod.rs +++ b/bindings/nostr-sdk-js/src/client/mod.rs @@ -25,6 +25,7 @@ pub use self::signer::JsClientSigner; use self::zapper::{JsZapDetails, JsZapEntity}; use crate::abortable::JsAbortHandle; use crate::database::JsNostrDatabase; +use crate::duration::JsDuration; use crate::relay::options::JsNegentropyOptions; use crate::relay::{JsRelay, JsRelayArray}; @@ -163,10 +164,10 @@ impl JsClient { pub async fn get_events_of( &self, filters: Vec, - timeout: Option, + timeout: Option, ) -> Result { let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); - let timeout: Option = timeout.map(Duration::from_secs_f64); + let timeout: Option = timeout.map(|d| *d); let events: Vec = self .inner .get_events_of(filters, timeout) @@ -183,6 +184,36 @@ impl JsClient { Ok(events) } + /// Get events of filters from specific relays + /// + /// Get events both from **local database** and **relays** + /// + /// If no relay is specified, will be queried only the database. + #[wasm_bindgen(js_name = getEventsFrom)] + pub async fn get_events_from( + &self, + urls: Vec, + filters: Vec, + timeout: Option, + ) -> Result { + let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); + let timeout: Option = timeout.map(|d| *d); + let events: Vec = self + .inner + .get_events_from(urls, filters, timeout) + .await + .map_err(into_err)?; + let events: JsEventArray = events + .into_iter() + .map(|e| { + let e: JsEvent = e.into(); + JsValue::from(e) + }) + .collect::() + .unchecked_into(); + Ok(events) + } + /// Request events of filters. /// All events will be received on notification listener /// until the EOSE "end of stored events" message is received from the relay. @@ -204,9 +235,9 @@ impl JsClient { /// Send client message to a specific relay #[wasm_bindgen(js_name = sendMsgTo)] - pub async fn send_msg_to(&self, url: String, msg: &JsClientMessage) -> Result<()> { + pub async fn send_msg_to(&self, urls: Vec, msg: &JsClientMessage) -> Result<()> { self.inner - .send_msg_to(url, msg.deref().clone()) + .send_msg_to(urls, msg.deref().clone()) .await .map_err(into_err) } @@ -229,9 +260,9 @@ impl JsClient { /// This method will wait for the `OK` message from the relay. /// If you not want to wait for the `OK` message, use `sendMsgTo` method instead. #[wasm_bindgen(js_name = sendEventTo)] - pub async fn send_event_to(&self, url: String, event: &JsEvent) -> Result { + pub async fn send_event_to(&self, urls: Vec, event: &JsEvent) -> Result { self.inner - .send_event_to(url, event.deref().clone()) + .send_event_to(urls, event.deref().clone()) .await .map_err(into_err) .map(|id| id.into()) @@ -265,11 +296,11 @@ impl JsClient { #[wasm_bindgen(js_name = sendEventBuilderTo)] pub async fn send_event_builder_to( &self, - url: String, + urls: Vec, builder: &JsEventBuilder, ) -> Result { self.inner - .send_event_builder_to(url, builder.deref().clone()) + .send_event_builder_to(urls, builder.deref().clone()) .await .map_err(into_err) .map(|id| id.into()) diff --git a/crates/nostr-sdk/README.md b/crates/nostr-sdk/README.md index 3a27dc6d7..295c059fb 100644 --- a/crates/nostr-sdk/README.md +++ b/crates/nostr-sdk/README.md @@ -95,7 +95,7 @@ async fn main() -> Result<()> { // client.send_event(event).await?; // Send custom event to a specific previously added relay - client.send_event_to("wss://relay.damus.io", event).await?; + client.send_event_to(["wss://relay.damus.io"], event).await?; Ok(()) } diff --git a/crates/nostr-sdk/examples/client.rs b/crates/nostr-sdk/examples/client.rs index c58dbf730..bd0bcc0ec 100644 --- a/crates/nostr-sdk/examples/client.rs +++ b/crates/nostr-sdk/examples/client.rs @@ -16,18 +16,19 @@ async fn main() -> Result<()> { let client = Client::new(&my_keys); client.add_relay("wss://relay.damus.io").await?; client.add_relay("wss://nostr.wine").await?; + client.add_relay("wss://relay.rip").await?; client.connect().await; // Publish a text note client.publish_text_note("Hello world", []).await?; - // Create a text note POW event + // Create a text note POW event and broadcast to all connected relays let event: Event = EventBuilder::text_note("POW text note from nostr-sdk", []).to_pow_event(&my_keys, 20)?; client.send_event(event).await?; - // Send multiple events at once + // Send multiple events at once (to all relays) let mut events: Vec = Vec::new(); for i in 0..10 { events.push(EventBuilder::text_note(format!("Event #{i}"), []).to_event(&my_keys)?); @@ -35,5 +36,12 @@ async fn main() -> Result<()> { let opts = RelaySendOptions::default(); client.batch_event(events, opts).await?; + // Send event to specific relays + let event: Event = EventBuilder::text_note("POW text note from nostr-sdk 16", []) + .to_pow_event(&my_keys, 16)?; + client + .send_event_to(["wss://relay.damus.io", "wss://relay.rip"], event) + .await?; + Ok(()) } diff --git a/crates/nostr-sdk/examples/get-events-of.rs b/crates/nostr-sdk/examples/get-events-of.rs index ad2067625..b85d0a781 100644 --- a/crates/nostr-sdk/examples/get-events-of.rs +++ b/crates/nostr-sdk/examples/get-events-of.rs @@ -17,15 +17,31 @@ async fn main() -> Result<()> { let client = Client::default(); client.add_relay("wss://relay.damus.io").await?; client.add_relay("wss://nostr.wine").await?; + client.add_relay("wss://relay.rip").await?; client.add_relay("wss://relay.nostr.info").await?; client.connect().await; + // Get events from all connected relays let filter = Filter::new().author(public_key).kind(Kind::Metadata); let events = client .get_events_of(vec![filter], Some(Duration::from_secs(10))) .await; println!("{events:#?}"); + // Get events from specific relays + let filter = Filter::new() + .author(public_key) + .kind(Kind::TextNote) + .limit(3); + let events = client + .get_events_from( + ["wss://relay.damus.io", "wss://relay.rip"], + vec![filter], + Some(Duration::from_secs(10)), + ) + .await; + println!("{events:#?}"); + Ok(()) } diff --git a/crates/nostr-sdk/examples/subscriptions.rs b/crates/nostr-sdk/examples/subscriptions.rs index 11bf6f2fa..aaffc4c20 100644 --- a/crates/nostr-sdk/examples/subscriptions.rs +++ b/crates/nostr-sdk/examples/subscriptions.rs @@ -41,7 +41,7 @@ async fn main() -> Result<()> { .subscribe_with_internal_id( InternalSubscriptionId::Custom(String::from("other-id")), vec![other_filters], - None, + RelaySendOptions::default(), ) .await?; @@ -61,7 +61,7 @@ async fn main() -> Result<()> { .subscribe_with_internal_id( InternalSubscriptionId::Custom(String::from("other-id")), vec![other_filters], - None, + RelaySendOptions::default(), ) .await?; } else { @@ -73,7 +73,7 @@ async fn main() -> Result<()> { relay .unsubscribe_with_internal_id( InternalSubscriptionId::Custom(String::from("other-id")), - None, + RelaySendOptions::default(), ) .await?; // OR diff --git a/crates/nostr-sdk/src/client/blocking.rs b/crates/nostr-sdk/src/client/blocking.rs index 49821450e..9acdaf5e0 100644 --- a/crates/nostr-sdk/src/client/blocking.rs +++ b/crates/nostr-sdk/src/client/blocking.rs @@ -193,6 +193,20 @@ impl Client { RUNTIME.block_on(async { self.client.get_events_of(filters, timeout).await }) } + pub fn get_events_from( + &self, + urls: I, + filters: Vec, + timeout: Option, + ) -> Result, Error> + where + I: IntoIterator, + U: TryIntoUrl, + pool::Error: From<::Err>, + { + RUNTIME.block_on(async { self.client.get_events_from(urls, filters, timeout).await }) + } + pub fn req_events_of(&self, filters: Vec, timeout: Option) { RUNTIME.block_on(async { self.client.req_events_of(filters, timeout).await; @@ -203,12 +217,13 @@ impl Client { RUNTIME.block_on(async { self.client.send_msg(msg).await }) } - pub fn send_msg_to(&self, url: U, msg: ClientMessage) -> Result<(), Error> + pub fn send_msg_to(&self, urls: I, msg: ClientMessage) -> Result<(), Error> where + I: IntoIterator, U: TryIntoUrl, pool::Error: From<::Err>, { - RUNTIME.block_on(async { self.client.send_msg_to(url, msg).await }) + RUNTIME.block_on(async { self.client.send_msg_to(urls, msg).await }) } /// Send event @@ -216,12 +231,13 @@ impl Client { RUNTIME.block_on(async { self.client.send_event(event).await }) } - pub fn send_event_to(&self, url: U, event: Event) -> Result + pub fn send_event_to(&self, urls: I, event: Event) -> Result where + I: IntoIterator, U: TryIntoUrl, pool::Error: From<::Err>, { - RUNTIME.block_on(async { self.client.send_event_to(url, event).await }) + RUNTIME.block_on(async { self.client.send_event_to(urls, event).await }) } pub fn sign_event_builder(&self, builder: EventBuilder) -> Result { @@ -232,12 +248,17 @@ impl Client { RUNTIME.block_on(async { self.client.send_event_builder(builder).await }) } - pub fn send_event_builder_to(&self, url: U, builder: EventBuilder) -> Result + pub fn send_event_builder_to( + &self, + urls: I, + builder: EventBuilder, + ) -> Result where + I: IntoIterator, U: TryIntoUrl, pool::Error: From<::Err>, { - RUNTIME.block_on(async { self.client.send_event_builder_to(url, builder).await }) + RUNTIME.block_on(async { self.client.send_event_builder_to(urls, builder).await }) } pub fn set_metadata(&self, metadata: &Metadata) -> Result { diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index 685f821a4..f148282b7 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -186,9 +186,7 @@ impl Drop for Client { tracing::warn!("Client already dropped"); } else { tracing::debug!("Dropping the Client..."); - let _ = self - .dropped - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); + self.dropped.store(true, Ordering::SeqCst); let client: Client = self.clone(); thread::spawn(async move { client @@ -549,32 +547,14 @@ impl Client { /// # } /// ``` pub async fn subscribe(&self, filters: Vec) { - let wait: Option = if self.opts.get_wait_for_subscription() { - self.opts.send_timeout - } else { - None - }; - self.pool.subscribe(filters, wait).await; - } - - /// Subscribe to filters with custom wait - pub async fn subscribe_with_custom_wait(&self, filters: Vec, wait: Option) { - self.pool.subscribe(filters, wait).await; + let opts: RelaySendOptions = self.opts.get_wait_for_subscription(); + self.pool.subscribe(filters, opts).await; } /// Unsubscribe from filters pub async fn unsubscribe(&self) { - let wait: Option = if self.opts.get_wait_for_subscription() { - self.opts.send_timeout - } else { - None - }; - self.pool.unsubscribe(wait).await; - } - - /// Unsubscribe from filters with custom wait - pub async fn unsubscribe_with_custom_wait(&self, wait: Option) { - self.pool.unsubscribe(wait).await; + let opts: RelaySendOptions = self.opts.get_wait_for_subscription(); + self.pool.unsubscribe(opts).await; } /// Get events of filters @@ -627,7 +607,31 @@ impl Client { Ok(self.pool.get_events_of(filters, timeout, opts).await?) } + /// Get events of filters from specific relays + /// + /// Get events both from **local database** and **relays** + /// + /// If no relay is specified, will be queried only the database. + pub async fn get_events_from( + &self, + urls: I, + filters: Vec, + timeout: Option, + ) -> Result, Error> + where + I: IntoIterator, + U: TryIntoUrl, + pool::Error: From<::Err>, + { + let timeout: Duration = timeout.unwrap_or(self.opts.timeout); + Ok(self + .pool + .get_events_from(urls, filters, timeout, FilterOptions::ExitOnEOSE) + .await?) + } + /// Request events of filters + /// /// All events will be received on notification listener (`client.notifications()`) /// until the EOSE "end of stored events" message is received from the relay. /// @@ -646,84 +650,120 @@ impl Client { timeout: Option, opts: FilterOptions, ) { - let timeout: Duration = match timeout { - Some(t) => t, - None => self.opts.timeout, - }; + let timeout: Duration = timeout.unwrap_or(self.opts.timeout); self.pool.req_events_of(filters, timeout, opts).await; } - /// Send client message - pub async fn send_msg(&self, msg: ClientMessage) -> Result<(), Error> { - let wait: Option = if self.opts.get_wait_for_send() { - self.opts.send_timeout - } else { - None - }; - self.pool.send_msg(msg, wait).await?; + /// Request events of filters from specific relays + /// + /// All events will be received on notification listener (`client.notifications()`) + /// until the EOSE "end of stored events" message is received from the relay. + /// + /// If timeout is set to `None`, the default from [`Options`] will be used. + pub async fn req_events_from( + &self, + urls: I, + filters: Vec, + timeout: Option, + ) -> Result<(), Error> + where + I: IntoIterator, + U: TryIntoUrl, + pool::Error: From<::Err>, + { + let timeout: Duration = timeout.unwrap_or(self.opts.timeout); + self.pool + .req_events_from(urls, filters, timeout, FilterOptions::ExitOnEOSE) + .await?; Ok(()) } - /// Batch send client messages + /// Send client message to **all relays** + pub async fn send_msg(&self, msg: ClientMessage) -> Result<(), Error> { + let opts: RelaySendOptions = self.opts.get_wait_for_send(); + Ok(self.pool.send_msg(msg, opts).await?) + } + + /// Batch send client messages to **all relays** pub async fn batch_msg( &self, msgs: Vec, - wait: Option, + opts: RelaySendOptions, ) -> Result<(), Error> { - self.pool.batch_msg(msgs, wait).await?; - Ok(()) + Ok(self.pool.batch_msg(msgs, opts).await?) } - /// Send client message to a specific relay - pub async fn send_msg_to(&self, url: U, msg: ClientMessage) -> Result<(), Error> + /// Send client message to a **specific relays** + pub async fn send_msg_to(&self, urls: I, msg: ClientMessage) -> Result<(), Error> where + I: IntoIterator, U: TryIntoUrl, pool::Error: From<::Err>, { - let wait: Option = if self.opts.get_wait_for_send() { - self.opts.send_timeout - } else { - None - }; - Ok(self.pool.send_msg_to(url, msg, wait).await?) + let opts: RelaySendOptions = self.opts.get_wait_for_send(); + Ok(self.pool.send_msg_to(urls, msg, opts).await?) + } + + /// Batch send client messages to **specific relays** + pub async fn batch_msg_to( + &self, + urls: I, + msgs: Vec, + opts: RelaySendOptions, + ) -> Result<(), Error> + where + I: IntoIterator, + U: TryIntoUrl, + pool::Error: From<::Err>, + { + Ok(self.pool.batch_msg_to(urls, msgs, opts).await?) } - /// Send event + /// Send event to **all relays** /// /// This method will wait for the `OK` message from the relay. /// If you not want to wait for the `OK` message, use `send_msg` method instead. pub async fn send_event(&self, event: Event) -> Result { - let timeout: Option = self.opts.send_timeout; - let opts = RelaySendOptions::new() - .skip_disconnected(self.opts.get_skip_disconnected_relays()) - .timeout(timeout); + let opts: RelaySendOptions = self.opts.get_wait_for_send(); Ok(self.pool.send_event(event, opts).await?) } - /// Send multiple [`Event`] at once + /// Send multiple [`Event`] at once to **all relays**. pub async fn batch_event( &self, events: Vec, opts: RelaySendOptions, ) -> Result<(), Error> { - self.pool.batch_event(events, opts).await?; - Ok(()) + Ok(self.pool.batch_event(events, opts).await?) } - /// Send event to specific relay + /// Send event to **specific relays**. /// /// This method will wait for the `OK` message from the relay. /// If you not want to wait for the `OK` message, use `send_msg` method instead. - pub async fn send_event_to(&self, url: U, event: Event) -> Result + pub async fn send_event_to(&self, urls: I, event: Event) -> Result + where + I: IntoIterator, + U: TryIntoUrl, + pool::Error: From<::Err>, + { + let opts: RelaySendOptions = self.opts.get_wait_for_send(); + Ok(self.pool.send_event_to(urls, event, opts).await?) + } + + /// Send multiple [`Event`] at once to **specific relays**. + pub async fn batch_event_to( + &self, + urls: I, + events: Vec, + opts: RelaySendOptions, + ) -> Result<(), Error> where + I: IntoIterator, U: TryIntoUrl, pool::Error: From<::Err>, { - let timeout: Option = self.opts.send_timeout; - let opts = RelaySendOptions::new() - .skip_disconnected(self.opts.get_skip_disconnected_relays()) - .timeout(timeout); - Ok(self.pool.send_event_to(url, event, opts).await?) + Ok(self.pool.batch_event_to(urls, events, opts).await?) } /// Signs the [`EventBuilder`] into an [`Event`] using the [`ClientSigner`] @@ -779,7 +819,7 @@ impl Client { } } - /// Take an [`EventBuilder`], sign it by using the [`ClientSigner`] and broadcast to all relays. + /// Take an [`EventBuilder`], sign it by using the [`ClientSigner`] and broadcast to **all relays**. /// /// Rise an error if the [`ClientSigner`] is not set. pub async fn send_event_builder(&self, builder: EventBuilder) -> Result { @@ -787,20 +827,21 @@ impl Client { self.send_event(event).await } - /// Take an [`EventBuilder`], sign it by using the [`ClientSigner`] and broadcast to specific relays. + /// Take an [`EventBuilder`], sign it by using the [`ClientSigner`] and broadcast to **specific relays**. /// /// Rise an error if the [`ClientSigner`] is not set. - pub async fn send_event_builder_to( + pub async fn send_event_builder_to( &self, - url: U, + urls: I, builder: EventBuilder, ) -> Result where + I: IntoIterator, U: TryIntoUrl, pool::Error: From<::Err>, { let event: Event = self.sign_event_builder(builder).await?; - self.send_event_to(url, event).await + self.send_event_to(urls, event).await } /// NIP44 encryption with [ClientSigner] diff --git a/crates/nostr-sdk/src/client/options.rs b/crates/nostr-sdk/src/client/options.rs index 2820e0d55..41a84a799 100644 --- a/crates/nostr-sdk/src/client/options.rs +++ b/crates/nostr-sdk/src/client/options.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use std::time::Duration; use crate::relay::RelayPoolOptions; +use crate::RelaySendOptions; pub(crate) const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_secs(20); /// Default Support Rust Nostr LUD16 @@ -95,8 +96,13 @@ impl Options { } } - pub(crate) fn get_wait_for_send(&self) -> bool { - self.wait_for_send.load(Ordering::SeqCst) + pub(crate) fn get_wait_for_send(&self) -> RelaySendOptions { + let skip_disconnected = self.get_skip_disconnected_relays(); + let wait_for_send = self.wait_for_send.load(Ordering::SeqCst); + RelaySendOptions::new() + .timeout(self.send_timeout) + .skip_send_confirmation(!wait_for_send) + .skip_disconnected(skip_disconnected) } /// If set to `true`, `Client` wait that a subscription msg is sent before continue (`subscribe` and `unsubscribe` methods) @@ -107,8 +113,13 @@ impl Options { } } - pub(crate) fn get_wait_for_subscription(&self) -> bool { - self.wait_for_subscription.load(Ordering::SeqCst) + pub(crate) fn get_wait_for_subscription(&self) -> RelaySendOptions { + let skip_disconnected = self.get_skip_disconnected_relays(); + let wait_for_subscription = self.wait_for_subscription.load(Ordering::SeqCst); + RelaySendOptions::new() + .timeout(self.send_timeout) + .skip_send_confirmation(!wait_for_subscription) + .skip_disconnected(skip_disconnected) } /// Set default POW diffficulty for `Event` diff --git a/crates/nostr-sdk/src/client/signer/nip46.rs b/crates/nostr-sdk/src/client/signer/nip46.rs index 55b872d67..7c8814555 100644 --- a/crates/nostr-sdk/src/client/signer/nip46.rs +++ b/crates/nostr-sdk/src/client/signer/nip46.rs @@ -123,7 +123,7 @@ impl Client { // Subscribe self.send_msg_to( - signer.relay_url(), + [signer.relay_url()], ClientMessage::req(id.clone(), vec![filter]), ) .await?; @@ -150,7 +150,7 @@ impl Client { .ok_or(Error::Timeout)??; // Unsubscribe - self.send_msg_to(signer.relay_url(), ClientMessage::close(id)) + self.send_msg_to([signer.relay_url()], ClientMessage::close(id)) .await?; } @@ -181,7 +181,7 @@ impl Client { .to_event(&signer.app_keys)?; // Send request to signer - self.send_event_to(signer.relay_url(), event).await?; + self.send_event_to([signer.relay_url()], event).await?; let sub_id = SubscriptionId::generate(); let filter = Filter::new() @@ -191,7 +191,7 @@ impl Client { // Subscribe self.send_msg_to( - signer.relay_url(), + [signer.relay_url()], ClientMessage::req(sub_id.clone(), vec![filter]), ) .await?; @@ -238,7 +238,7 @@ impl Client { // Unsubscribe self.send_msg_to( - signer.relay_url(), + [signer.relay_url()], ClientMessage::close(sub_id.clone()), ) .await?; @@ -248,7 +248,7 @@ impl Client { if let Some(error) = error { // Unsubscribe self.send_msg_to( - signer.relay_url(), + [signer.relay_url()], ClientMessage::close(sub_id.clone()), ) .await?; @@ -269,7 +269,7 @@ impl Client { time::timeout(timeout, future).await.ok_or(Error::Timeout)?; // Unsubscribe - self.send_msg_to(signer.relay_url(), ClientMessage::close(sub_id)) + self.send_msg_to([signer.relay_url()], ClientMessage::close(sub_id)) .await?; res diff --git a/crates/nostr-sdk/src/client/zapper.rs b/crates/nostr-sdk/src/client/zapper.rs index 6a63c6831..4faf94ffc 100644 --- a/crates/nostr-sdk/src/client/zapper.rs +++ b/crates/nostr-sdk/src/client/zapper.rs @@ -202,7 +202,7 @@ impl Client { let event_id = event.id; // Send request - self.send_event_to(uri.relay_url.clone(), event).await?; + self.send_event_to([uri.relay_url.clone()], event).await?; // Get response let relay = self.relay(uri.relay_url.clone()).await?; diff --git a/crates/nostr-sdk/src/relay/mod.rs b/crates/nostr-sdk/src/relay/mod.rs index bb4bc5329..6bc61dab5 100644 --- a/crates/nostr-sdk/src/relay/mod.rs +++ b/crates/nostr-sdk/src/relay/mod.rs @@ -55,6 +55,7 @@ use crate::RUNTIME; type Message = (RelayEvent, Option>); +const MIN_ATTEMPTS: usize = 1; const MIN_UPTIME: f64 = 0.90; #[cfg(not(target_arch = "wasm32"))] const PING_INTERVAL: u64 = 55; @@ -71,9 +72,6 @@ pub enum Error { /// Database error #[error(transparent)] Database(#[from] DatabaseError), - /// Channel timeout - #[error("channel timeout")] - ChannelTimeout, /// Message response timeout #[error("recv message response timeout")] RecvTimeout, @@ -86,6 +84,9 @@ pub enum Error { /// Relay not connected #[error("relay not connected")] NotConnected, + /// Relay not connected + #[error("relay not connected (status changed)")] + NotConnectedStatusChanged, /// Event not published #[error("event not published: {0}")] EventNotPublished(String), @@ -100,9 +101,6 @@ pub enum Error { /// Not published events not_published: HashMap, }, - /// Loop terminated - #[error("loop terminated")] - LoopTerminated, /// Batch event empty #[error("batch event cannot be empty")] BatchEventEmpty, @@ -189,12 +187,17 @@ impl fmt::Display for RelayStatus { } } +impl RelayStatus { + /// Check if is `disconnected`, `stopped` or `terminated` + fn is_disconnected(&self) -> bool { + matches!(self, Self::Disconnected | Self::Stopped | Self::Terminated) + } +} + /// Relay event #[derive(Debug)] pub enum RelayEvent { - /// Send [`ClientMessage`] - SendMsg(Box), - /// Send multiple messages at once + /// Send messages Batch(Vec), /// Ping #[cfg(not(target_arch = "wasm32"))] @@ -457,9 +460,7 @@ impl Relay { } fn schedule_for_stop(&self, value: bool) { - let _ = self - .scheduled_for_stop - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(value)); + self.scheduled_for_stop.store(value, Ordering::SeqCst); } fn is_scheduled_for_termination(&self) -> bool { @@ -467,9 +468,8 @@ impl Relay { } fn schedule_for_termination(&self, value: bool) { - let _ = - self.scheduled_for_termination - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(value)); + self.scheduled_for_termination + .store(value, Ordering::SeqCst); } /// Connect to relay and keep alive connection @@ -631,21 +631,15 @@ impl Relay { } let nonce: u64 = rand::thread_rng().gen(); - if relay.stats.ping.set_last_nonce(nonce) - && relay.stats.ping.set_replied(false) + relay.stats.ping.set_last_nonce(nonce); + relay.stats.ping.set_replied(false); + + if let Err(e) = + relay.send_relay_event(RelayEvent::Ping { nonce }, None) { - if let Err(e) = - relay.send_relay_event(RelayEvent::Ping { nonce }, None) - { - tracing::error!("Impossible to ping {}: {e}", relay.url); - break; - }; - } else { - tracing::warn!( - "`last_nonce` or `replied` not updated for {}!", - relay.url - ); - } + tracing::error!("Impossible to ping {}: {e}", relay.url); + break; + }; thread::sleep(Duration::from_secs(PING_INTERVAL)).await; } @@ -665,54 +659,27 @@ impl Relay { let mut rx = relay.relay_receiver.lock().await; while let Some((relay_event, oneshot_sender)) = rx.recv().await { match relay_event { - RelayEvent::SendMsg(msg) => { - let json = msg.as_json(); - let size: usize = json.as_bytes().len(); - tracing::debug!( - "Sending {json} to {} (size: {size} bytes)", - relay.url - ); - match ws_tx.send(WsMessage::Text(json)).await { - Ok(_) => { - relay.stats.add_bytes_sent(size); - if let Some(sender) = oneshot_sender { - if let Err(e) = sender.send(true) { - tracing::error!( - "Impossible to send oneshot msg: {}", - e - ); - } - } - } - Err(e) => { - tracing::error!( - "Impossible to send msg to {}: {}", - relay.url(), - e.to_string() + RelayEvent::Batch(msgs) => { + let msgs: Vec = + msgs.into_iter().map(|msg| msg.as_json()).collect(); + let size: usize = msgs.iter().map(|msg| msg.as_bytes().len()).sum(); + let len = msgs.len(); + + if len == 1 { + if let Some(json) = msgs.first() { + tracing::debug!( + "Sending {json} to {} (size: {size} bytes)", + relay.url ); - if let Some(sender) = oneshot_sender { - if let Err(e) = sender.send(false) { - tracing::error!( - "Impossible to send oneshot msg: {}", - e - ); - } - } - break; } + } else { + tracing::debug!( + "Sending {len} messages to {} (size: {size} bytes)", + relay.url + ); } - } - RelayEvent::Batch(msgs) => { - let len = msgs.len(); - let size: usize = - msgs.iter().map(|msg| msg.as_json().as_bytes().len()).sum(); - tracing::debug!( - "Sending {len} messages to {} (size: {size} bytes)", - relay.url - ); - let msgs = msgs - .into_iter() - .map(|msg| Ok(WsMessage::Text(msg.as_json()))); + + let msgs = msgs.into_iter().map(|msg| Ok(WsMessage::Text(msg))); let mut stream = futures_util::stream::iter(msgs); match ws_tx.send_all(&mut stream).await { Ok(_) => { @@ -895,6 +862,8 @@ impl Relay { break; } } + Err(Error::MessageHandle(MessageHandleError::EmptyMsg)) => { + } Err(e) => tracing::error!( "Impossible to handle relay message from {}: {e}", relay.url @@ -930,7 +899,10 @@ impl Relay { // Subscribe to relay if self.opts.flags.has_read() { - if let Err(e) = self.resubscribe_all(None).await { + if let Err(e) = self + .resubscribe_all(RelaySendOptions::default().skip_send_confirmation(true)) + .await + { tracing::error!( "Impossible to subscribe to {}: {}", self.url(), @@ -959,10 +931,7 @@ impl Relay { /// Disconnect from relay and set status to 'Disconnected' async fn disconnect(&self) -> Result<(), Error> { let status = self.status().await; - if status.ne(&RelayStatus::Disconnected) - && status.ne(&RelayStatus::Stopped) - && status.ne(&RelayStatus::Terminated) - { + if !status.is_disconnected() { self.send_relay_event(RelayEvent::Close, None)?; } Ok(()) @@ -972,10 +941,7 @@ impl Relay { pub async fn stop(&self) -> Result<(), Error> { self.schedule_for_stop(true); let status = self.status().await; - if status.ne(&RelayStatus::Disconnected) - && status.ne(&RelayStatus::Stopped) - && status.ne(&RelayStatus::Terminated) - { + if !status.is_disconnected() { self.send_relay_event(RelayEvent::Stop, None)?; } Ok(()) @@ -985,56 +951,22 @@ impl Relay { pub async fn terminate(&self) -> Result<(), Error> { self.schedule_for_termination(true); let status = self.status().await; - if status.ne(&RelayStatus::Disconnected) - && status.ne(&RelayStatus::Stopped) - && status.ne(&RelayStatus::Terminated) - { + if !status.is_disconnected() { self.send_relay_event(RelayEvent::Terminate, None)?; } Ok(()) } /// Send msg to relay - pub async fn send_msg(&self, msg: ClientMessage, wait: Option) -> Result<(), Error> { - if !self.opts.flags.has_write() { - if let ClientMessage::Event(_) = msg { - return Err(Error::WriteDisabled); - } - } - - if !self.opts.flags.has_read() { - if let ClientMessage::Req { .. } | ClientMessage::Close(_) = msg { - return Err(Error::ReadDisabled); - } - } - - match wait { - Some(timeout) => { - let (tx, rx) = oneshot::channel::(); - self.send_relay_event(RelayEvent::SendMsg(Box::new(msg)), Some(tx))?; - match time::timeout(Some(timeout), rx).await { - Some(result) => match result { - Ok(val) => { - if val { - Ok(()) - } else { - Err(Error::MessageNotSent) - } - } - Err(_) => Err(Error::OneShotRecvError), - }, - _ => Err(Error::RecvTimeout), - } - } - None => self.send_relay_event(RelayEvent::SendMsg(Box::new(msg)), None), - } + pub async fn send_msg(&self, msg: ClientMessage, opts: RelaySendOptions) -> Result<(), Error> { + self.batch_msg(vec![msg], opts).await } /// Send multiple [`ClientMessage`] at once pub async fn batch_msg( &self, msgs: Vec, - wait: Option, + opts: RelaySendOptions, ) -> Result<(), Error> { if !self.opts.flags.has_write() && msgs.iter().any(|msg| msg.is_event()) { return Err(Error::WriteDisabled); @@ -1044,83 +976,40 @@ impl Relay { return Err(Error::ReadDisabled); } - match wait { - Some(timeout) => { - let (tx, rx) = oneshot::channel::(); - self.send_relay_event(RelayEvent::Batch(msgs), Some(tx))?; - match time::timeout(Some(timeout), rx).await { - Some(result) => match result { - Ok(val) => { - if val { - Ok(()) - } else { - Err(Error::MessageNotSent) - } - } - Err(_) => Err(Error::OneShotRecvError), - }, - _ => Err(Error::RecvTimeout), - } - } - None => self.send_relay_event(RelayEvent::Batch(msgs), None), - } - } - - /// Send event and wait for `OK` relay msg - pub async fn send_event(&self, event: Event, opts: RelaySendOptions) -> Result { - let id: EventId = event.id(); - if opts.skip_disconnected && !self.is_connected().await - && self.stats.attempts() > 1 + && self.stats.attempts() > MIN_ATTEMPTS && self.stats.uptime() < MIN_UPTIME { - return Err(Error::EventNotPublished(String::from( - "relay not connected", - ))); + return Err(Error::NotConnected); } - time::timeout(Some(opts.timeout), async { - self.send_msg(ClientMessage::event(event), None).await?; - let mut notifications = self.notification_sender.subscribe(); - while let Ok(notification) = notifications.recv().await { - match notification { - RelayPoolNotification::Message { - relay_url, - message: - RelayMessage::Ok { - event_id, - status, - message, - }, - } => { - if self.url == relay_url && id == event_id { - if status { - return Ok(event_id); - } else { - return Err(Error::EventNotPublished(message)); - } - } - } - RelayPoolNotification::RelayStatus { relay_url, status } => { - if opts.skip_disconnected && relay_url == self.url { - if let RelayStatus::Disconnected - | RelayStatus::Stopped - | RelayStatus::Terminated = status - { - return Err(Error::EventNotPublished(String::from( - "relay not connected (status changed)", - ))); - } + if opts.skip_send_confirmation { + self.send_relay_event(RelayEvent::Batch(msgs), None) + } else { + let (tx, rx) = oneshot::channel::(); + self.send_relay_event(RelayEvent::Batch(msgs), Some(tx))?; + match time::timeout(Some(opts.timeout), rx).await { + Some(result) => match result { + Ok(val) => { + if val { + Ok(()) + } else { + Err(Error::MessageNotSent) } } - _ => (), - } + Err(_) => Err(Error::OneShotRecvError), + }, + _ => Err(Error::RecvTimeout), } - Err(Error::LoopTerminated) - }) - .await - .ok_or(Error::Timeout)? + } + } + + /// Send event and wait for `OK` relay msg + pub async fn send_event(&self, event: Event, opts: RelaySendOptions) -> Result { + let id: EventId = event.id(); + self.batch_event(vec![event], opts).await?; + Ok(id) } /// Send multiple [`Event`] at once @@ -1133,26 +1022,20 @@ impl Relay { return Err(Error::BatchEventEmpty); } - if opts.skip_disconnected - && !self.is_connected().await - && self.stats.attempts() > 1 - && self.stats.uptime() < MIN_UPTIME - { - return Err(Error::EventNotPublished(String::from( - "relay not connected", - ))); - } - - let mut msgs: Vec = Vec::with_capacity(events.len()); - let mut missing: HashSet = HashSet::new(); + let events_len: usize = events.len(); + let mut msgs: Vec = Vec::with_capacity(events_len); + let mut missing: HashSet = HashSet::with_capacity(events_len); for event in events.into_iter() { missing.insert(event.id()); msgs.push(ClientMessage::event(event)); } + // Batch send messages + self.batch_msg(msgs, opts).await?; + + // Hanlde responses time::timeout(Some(opts.timeout), async { - self.batch_msg(msgs, None).await?; let mut published: HashSet = HashSet::new(); let mut not_published: HashMap = HashMap::new(); let mut notifications = self.notification_sender.subscribe(); @@ -1168,6 +1051,14 @@ impl Relay { }, } => { if self.url == relay_url && missing.remove(&event_id) { + if events_len == 1 { + if status { + return Ok(()); + } else { + return Err(Error::EventNotPublished(message)); + } + } + if status { published.insert(event_id); } else { @@ -1176,15 +1067,13 @@ impl Relay { } } RelayPoolNotification::RelayStatus { relay_url, status } => { - if opts.skip_disconnected && relay_url == self.url { - if let RelayStatus::Disconnected - | RelayStatus::Stopped - | RelayStatus::Terminated = status - { - return Err(Error::EventNotPublished(String::from( - "relay not connected (status changed)", - ))); - } + if opts.skip_disconnected + && relay_url == self.url + && status.is_disconnected() + { + return Err(Error::EventNotPublished(String::from( + "relay not connected (status changed)", + ))); } } _ => (), @@ -1211,7 +1100,7 @@ impl Relay { } /// Subscribes relay with existing filter - async fn resubscribe_all(&self, wait: Option) -> Result<(), Error> { + async fn resubscribe_all(&self, opts: RelaySendOptions) -> Result<(), Error> { if !self.opts.flags.has_read() { return Err(Error::ReadDisabled); } @@ -1220,10 +1109,10 @@ impl Relay { for (internal_id, sub) in subscriptions.into_iter() { if !sub.filters.is_empty() { - self.send_msg(ClientMessage::req(sub.id.clone(), sub.filters), wait) + self.send_msg(ClientMessage::req(sub.id.clone(), sub.filters), opts) .await?; } else { - tracing::warn!("Subscription '{internal_id}' has empty filters"); + tracing::debug!("Subscription '{internal_id}' has empty filters"); } } @@ -1233,7 +1122,7 @@ impl Relay { async fn resubscribe( &self, internal_id: InternalSubscriptionId, - wait: Option, + opts: RelaySendOptions, ) -> Result<(), Error> { if !self.opts.flags.has_read() { return Err(Error::ReadDisabled); @@ -1243,7 +1132,7 @@ impl Relay { .subscription(&internal_id) .await .ok_or(Error::InternalIdNotFound)?; - self.send_msg(ClientMessage::req(sub.id, sub.filters), wait) + self.send_msg(ClientMessage::req(sub.id, sub.filters), opts) .await?; Ok(()) @@ -1255,9 +1144,9 @@ impl Relay { pub async fn subscribe( &self, filters: Vec, - wait: Option, + opts: RelaySendOptions, ) -> Result<(), Error> { - self.subscribe_with_internal_id(InternalSubscriptionId::Default, filters, wait) + self.subscribe_with_internal_id(InternalSubscriptionId::Default, filters, opts) .await } @@ -1266,7 +1155,7 @@ impl Relay { &self, internal_id: InternalSubscriptionId, filters: Vec, - wait: Option, + opts: RelaySendOptions, ) -> Result<(), Error> { if !self.opts.flags.has_read() { return Err(Error::ReadDisabled); @@ -1278,14 +1167,14 @@ impl Relay { self.update_subscription_filters(internal_id.clone(), filters) .await; - self.resubscribe(internal_id, wait).await + self.resubscribe(internal_id, opts).await } /// Unsubscribe /// /// Internal Subscription ID set to `InternalSubscriptionId::Default` - pub async fn unsubscribe(&self, wait: Option) -> Result<(), Error> { - self.unsubscribe_with_internal_id(InternalSubscriptionId::Default, wait) + pub async fn unsubscribe(&self, opts: RelaySendOptions) -> Result<(), Error> { + self.unsubscribe_with_internal_id(InternalSubscriptionId::Default, opts) .await } @@ -1293,7 +1182,7 @@ impl Relay { pub async fn unsubscribe_with_internal_id( &self, internal_id: InternalSubscriptionId, - wait: Option, + opts: RelaySendOptions, ) -> Result<(), Error> { if !self.opts.flags.has_read() { return Err(Error::ReadDisabled); @@ -1303,13 +1192,13 @@ impl Relay { let subscription = subscriptions .remove(&internal_id) .ok_or(Error::InternalIdNotFound)?; - self.send_msg(ClientMessage::close(subscription.id), wait) + self.send_msg(ClientMessage::close(subscription.id), opts) .await?; Ok(()) } /// Unsubscribe from all subscriptions - pub async fn unsubscribe_all(&self, wait: Option) -> Result<(), Error> { + pub async fn unsubscribe_all(&self, opts: RelaySendOptions) -> Result<(), Error> { if !self.opts.flags.has_read() { return Err(Error::ReadDisabled); } @@ -1317,7 +1206,7 @@ impl Relay { let subscriptions = self.subscriptions().await; for sub in subscriptions.into_values() { - self.send_msg(ClientMessage::close(sub.id.clone()), wait) + self.send_msg(ClientMessage::close(sub.id.clone()), opts) .await?; } @@ -1335,7 +1224,7 @@ impl Relay { F: Future, { if !self.is_connected().await - && self.stats.attempts() > 1 + && self.stats.attempts() > MIN_ATTEMPTS && self.stats.uptime() < MIN_UPTIME { return Err(Error::NotConnected); @@ -1383,8 +1272,8 @@ impl Relay { _ => (), }, RelayPoolNotification::RelayStatus { relay_url, status } => { - if relay_url == self.url && status != RelayStatus::Connected { - return Err(Error::NotConnected); + if relay_url == self.url && status.is_disconnected() { + return Err(Error::NotConnectedStatusChanged); } } RelayPoolNotification::Stop | RelayPoolNotification::Shutdown => break, @@ -1447,15 +1336,16 @@ impl Relay { } let id = SubscriptionId::generate(); + let send_opts = RelaySendOptions::default().skip_send_confirmation(true); - self.send_msg(ClientMessage::req(id.clone(), filters), None) + self.send_msg(ClientMessage::req(id.clone(), filters), send_opts) .await?; self.handle_events_of(id.clone(), timeout, opts, callback) .await?; // Unsubscribe - self.send_msg(ClientMessage::close(id), None).await?; + self.send_msg(ClientMessage::close(id), send_opts).await?; Ok(()) } @@ -1493,10 +1383,11 @@ impl Relay { let relay = self.clone(); thread::spawn(async move { let id = SubscriptionId::generate(); + let send_opts = RelaySendOptions::default().skip_send_confirmation(true); // Subscribe if let Err(e) = relay - .send_msg(ClientMessage::req(id.clone(), filters), None) + .send_msg(ClientMessage::req(id.clone(), filters), send_opts) .await { tracing::error!( @@ -1514,7 +1405,7 @@ impl Relay { } // Unsubscribe - if let Err(e) = relay.send_msg(ClientMessage::close(id), None).await { + if let Err(e) = relay.send_msg(ClientMessage::close(id), send_opts).await { tracing::error!( "Impossible to close subscription with {}: {}", relay.url(), @@ -1531,7 +1422,8 @@ impl Relay { timeout: Duration, ) -> Result { let id = SubscriptionId::generate(); - self.send_msg(ClientMessage::count(id.clone(), filters), None) + let send_opts = RelaySendOptions::default().skip_send_confirmation(true); + self.send_msg(ClientMessage::count(id.clone(), filters), send_opts) .await?; let mut count = 0; @@ -1559,7 +1451,7 @@ impl Relay { .ok_or(Error::Timeout)?; // Unsubscribe - self.send_msg(ClientMessage::close(id), None).await?; + self.send_msg(ClientMessage::close(id), send_opts).await?; Ok(count) } @@ -1578,7 +1470,7 @@ impl Relay { // Check if relay is connected if !self.is_connected().await - && self.stats.attempts() > 1 + && self.stats.attempts() > MIN_ATTEMPTS && self.stats.uptime() < MIN_UPTIME { return Err(Error::NotConnected); @@ -1594,9 +1486,9 @@ impl Relay { // Send initial negentropy message let sub_id = SubscriptionId::generate(); + let send_opts = RelaySendOptions::default().skip_send_confirmation(true); let open_msg = ClientMessage::neg_open(&mut negentropy, &sub_id, filter)?; - self.send_msg(open_msg, Some(Duration::from_secs(10))) - .await?; + self.send_msg(open_msg, send_opts).await?; let mut notifications = self.notification_sender.subscribe(); let mut temp_notifications = self.notification_sender.subscribe(); @@ -1692,7 +1584,7 @@ impl Relay { subscription_id: sub_id.clone(), message: query.to_hex(), }, - None, + send_opts, ) .await?; } @@ -1743,8 +1635,11 @@ impl Relay { match self.database.event_by_id(event_id).await { Ok(event) => { in_flight_up.insert(event_id); - self.send_msg(ClientMessage::event(event), None) - .await?; + self.send_msg( + ClientMessage::event(event), + send_opts, + ) + .await?; num_sent += 1; } Err(e) => tracing::error!("Couldn't upload event: {e}"), @@ -1783,7 +1678,7 @@ impl Relay { let filter = Filter::new().ids(ids); self.send_msg( ClientMessage::req(down_sub_id.clone(), vec![filter]), - None, + send_opts, ) .await?; @@ -1792,8 +1687,8 @@ impl Relay { } } RelayPoolNotification::RelayStatus { relay_url, status } => { - if relay_url == self.url && status != RelayStatus::Connected { - return Err(Error::NotConnected); + if relay_url == self.url && status.is_disconnected() { + return Err(Error::NotConnectedStatusChanged); } } RelayPoolNotification::Stop | RelayPoolNotification::Shutdown => break, @@ -1816,7 +1711,7 @@ impl Relay { let close_msg = ClientMessage::NegClose { subscription_id: sub_id, }; - self.send_msg(close_msg, None).await?; + self.send_msg(close_msg, send_opts).await?; Ok(()) } diff --git a/crates/nostr-sdk/src/relay/options.rs b/crates/nostr-sdk/src/relay/options.rs index a986e5b8b..2ed2bc3c2 100644 --- a/crates/nostr-sdk/src/relay/options.rs +++ b/crates/nostr-sdk/src/relay/options.rs @@ -162,16 +162,16 @@ impl RelayOptions { /// [`Relay`](super::Relay) send options #[derive(Debug, Clone, Copy)] pub struct RelaySendOptions { - /// Skip wait for disconnected relay (default: true) - pub skip_disconnected: bool, - /// Timeout for sending event (default: 10 secs) - pub timeout: Duration, + pub(super) skip_disconnected: bool, + pub(super) skip_send_confirmation: bool, + pub(super) timeout: Duration, } impl Default for RelaySendOptions { fn default() -> Self { Self { skip_disconnected: true, + skip_send_confirmation: false, timeout: DEFAULT_SEND_TIMEOUT, } } @@ -184,21 +184,23 @@ impl RelaySendOptions { } /// Skip wait for disconnected relay (default: true) - pub fn skip_disconnected(self, value: bool) -> Self { - Self { - skip_disconnected: value, - ..self - } + pub fn skip_disconnected(mut self, value: bool) -> Self { + self.skip_disconnected = value; + self + } + + /// Skip wait for confirmation that message is sent (default: false) + pub fn skip_send_confirmation(mut self, value: bool) -> Self { + self.skip_send_confirmation = value; + self } /// Timeout for sending event (default: 10 secs) /// /// If `None`, the default timeout will be used - pub fn timeout(self, value: Option) -> Self { - Self { - timeout: value.unwrap_or(DEFAULT_SEND_TIMEOUT), - ..self - } + pub fn timeout(mut self, timeout: Option) -> Self { + self.timeout = timeout.unwrap_or(DEFAULT_SEND_TIMEOUT); + self } } diff --git a/crates/nostr-sdk/src/relay/pool.rs b/crates/nostr-sdk/src/relay/pool.rs index 76bfa1ec7..470d68caf 100644 --- a/crates/nostr-sdk/src/relay/pool.rs +++ b/crates/nostr-sdk/src/relay/pool.rs @@ -55,18 +55,18 @@ pub enum Error { /// No relays #[error("no relays")] NoRelays, + /// No relays specified + #[error("no relays sepcified")] + NoRelaysSpecified, /// Msg not sent #[error("message not sent")] MsgNotSent, /// Msgs not sent #[error("messages not sent")] MsgsNotSent, - /// Event not published - #[error("event not published")] - EventNotPublished(EventId), - /// Events not published - #[error("events not published")] - EventsNotPublished, + /// Event/s not published + #[error("event/s not published")] + EventNotPublished, /// Relay not found #[error("relay not found")] RelayNotFound, @@ -155,9 +155,7 @@ impl RelayPoolTask { } fn set_running_to(&self, value: bool) { - let _ = self - .running - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(value)); + self.running.store(value, Ordering::SeqCst); } pub fn run(&self) { @@ -367,9 +365,7 @@ impl Drop for RelayPool { tracing::warn!("Relay Pool already dropped"); } else { tracing::debug!("Dropping the Relay Pool..."); - let _ = self - .dropped - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); + self.dropped.store(true, Ordering::SeqCst); let pool = self.clone(); thread::spawn(async move { pool.shutdown() @@ -467,6 +463,11 @@ impl RelayPool { relays.clone() } + async fn internal_relay(&self, url: &Url) -> Result { + let relays = self.relays.read().await; + relays.get(url).cloned().ok_or(Error::RelayNotFound) + } + /// Get [`Relay`] pub async fn relay(&self, url: U) -> Result where @@ -474,8 +475,7 @@ impl RelayPool { Error: From<::Err>, { let url: Url = url.try_into_url()?; - let relays = self.relays.read().await; - relays.get(&url).cloned().ok_or(Error::RelayNotFound) + self.internal_relay(&url).await } /// Get subscription filters @@ -528,160 +528,121 @@ impl RelayPool { } /// Send client message - pub async fn send_msg(&self, msg: ClientMessage, wait: Option) -> Result<(), Error> { + pub async fn send_msg(&self, msg: ClientMessage, opts: RelaySendOptions) -> Result<(), Error> { let relays = self.relays().await; - - if relays.is_empty() { - return Err(Error::NoRelays); - } - - if let ClientMessage::Event(event) = &msg { - self.database.save_event(event).await?; - } - - let sent_to_at_least_one_relay: Arc = Arc::new(AtomicBool::new(false)); - let mut handles = Vec::new(); - - for (url, relay) in relays.into_iter() { - let msg = msg.clone(); - let sent = sent_to_at_least_one_relay.clone(); - let handle = thread::spawn(async move { - match relay.send_msg(msg, wait).await { - Ok(_) => { - let _ = - sent.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); - } - Err(e) => tracing::error!("Impossible to send msg to {url}: {e}"), - } - }); - handles.push(handle); - } - - for handle in handles.into_iter().flatten() { - handle.join().await?; - } - - if !sent_to_at_least_one_relay.load(Ordering::SeqCst) { - return Err(Error::MsgNotSent); - } - - Ok(()) + self.send_msg_to(relays.into_keys(), msg, opts).await } /// Send multiple client messages at once pub async fn batch_msg( &self, msgs: Vec, - wait: Option, + opts: RelaySendOptions, ) -> Result<(), Error> { let relays = self.relays().await; - - if relays.is_empty() { - return Err(Error::NoRelays); - } - - // Save events into database - for msg in msgs.iter() { - if let ClientMessage::Event(event) = msg { - self.database.save_event(event).await?; - } - } - - let sent_to_at_least_one_relay: Arc = Arc::new(AtomicBool::new(false)); - let mut handles = Vec::new(); - - for (url, relay) in relays.into_iter() { - let len = msgs.len(); - let msgs = msgs.clone(); - let sent = sent_to_at_least_one_relay.clone(); - let handle = thread::spawn(async move { - match relay.batch_msg(msgs, wait).await { - Ok(_) => { - let _ = - sent.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); - } - Err(e) => tracing::error!("Impossible to send {len} messages to {url}: {e}"), - } - }); - handles.push(handle); - } - - for handle in handles.into_iter().flatten() { - handle.join().await?; - } - - if !sent_to_at_least_one_relay.load(Ordering::SeqCst) { - return Err(Error::MsgNotSent); - } - - Ok(()) + self.batch_msg_to(relays.into_keys(), msgs, opts).await } - /// Send client message to a single relay - pub async fn send_msg_to( + /// Send client message to specific relays + /// + /// Note: **the relays must already be added!** + pub async fn send_msg_to( &self, - url: U, + urls: I, msg: ClientMessage, - wait: Option, + opts: RelaySendOptions, ) -> Result<(), Error> where + I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { - let url: Url = url.try_into_url()?; + self.batch_msg_to(urls, vec![msg], opts).await + } - if let ClientMessage::Event(event) = &msg { - self.database.save_event(event).await?; + /// Send multiple client messages at once to specific relays + /// + /// Note: **the relays must already be added!** + pub async fn batch_msg_to( + &self, + urls: I, + msgs: Vec, + opts: RelaySendOptions, + ) -> Result<(), Error> + where + I: IntoIterator, + U: TryIntoUrl, + Error: From<::Err>, + { + // Compose URLs + let urls: HashSet = urls + .into_iter() + .map(|u| u.try_into_url()) + .collect::>()?; + + // Check if urls set isn't empty + if urls.is_empty() { + return Err(Error::NoRelaysSpecified); } - let relays = self.relays().await; - if let Some(relay) = relays.get(&url) { - relay.send_msg(msg, wait).await?; - Ok(()) - } else { - Err(Error::RelayNotFound) + // Save events into database + for msg in msgs.iter() { + if let ClientMessage::Event(event) = msg { + self.database.save_event(event).await?; + } } - } - /// Send event and wait for `OK` relay msg - pub async fn send_event(&self, event: Event, opts: RelaySendOptions) -> Result { - let relays = self.relays().await; + // Get relays + let relays: HashMap = self.relays().await; if relays.is_empty() { return Err(Error::NoRelays); } - self.database.save_event(&event).await?; - - let sent_to_at_least_one_relay: Arc = Arc::new(AtomicBool::new(false)); - let mut handles = Vec::new(); + // If passed only 1 url, not use threads + if urls.len() == 1 { + let url: Url = urls.into_iter().next().ok_or(Error::RelayNotFound)?; + let relay: &Relay = relays.get(&url).ok_or(Error::RelayNotFound)?; + relay.batch_msg(msgs, opts).await?; + } else { + // Check if urls set contains ONLY already added relays + if !urls.iter().all(|url| relays.contains_key(url)) { + return Err(Error::RelayNotFound); + } - let event_id: EventId = event.id(); + let sent_to_at_least_one_relay: Arc = Arc::new(AtomicBool::new(false)); + let mut handles = Vec::with_capacity(urls.len()); - for (url, relay) in relays.into_iter() { - let event = event.clone(); - let sent = sent_to_at_least_one_relay.clone(); - let handle = thread::spawn(async move { - match relay.send_event(event, opts).await { - Ok(_) => { - let _ = - sent.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); + for (url, relay) in relays.into_iter().filter(|(url, ..)| urls.contains(url)) { + let msgs = msgs.clone(); + let sent = sent_to_at_least_one_relay.clone(); + let handle = thread::spawn(async move { + match relay.batch_msg(msgs, opts).await { + Ok(_) => { + sent.store(true, Ordering::SeqCst); + } + Err(e) => tracing::error!("Impossible to send msg to {url}: {e}"), } - Err(e) => tracing::error!("Impossible to send event to {url}: {e}"), - } - }); - handles.push(handle); - } + }); + handles.push(handle); + } - for handle in handles.into_iter().flatten() { - handle.join().await?; - } + for handle in handles.into_iter().flatten() { + handle.join().await?; + } - if !sent_to_at_least_one_relay.load(Ordering::SeqCst) { - return Err(Error::EventNotPublished(event_id)); + if !sent_to_at_least_one_relay.load(Ordering::SeqCst) { + return Err(Error::MsgNotSent); + } } - Ok(event_id) + Ok(()) + } + + /// Send event and wait for `OK` relay msg + pub async fn send_event(&self, event: Event, opts: RelaySendOptions) -> Result { + let relays: HashMap = self.relays().await; + self.send_event_to(relays.into_keys(), event, opts).await } /// Send multiple [`Event`] at once @@ -691,9 +652,47 @@ impl RelayPool { opts: RelaySendOptions, ) -> Result<(), Error> { let relays = self.relays().await; + self.batch_event_to(relays.into_keys(), events, opts).await + } - if relays.is_empty() { - return Err(Error::NoRelays); + /// Send event to a specific relays + pub async fn send_event_to( + &self, + urls: I, + event: Event, + opts: RelaySendOptions, + ) -> Result + where + I: IntoIterator, + U: TryIntoUrl, + Error: From<::Err>, + { + let event_id: EventId = event.id; + self.batch_event_to(urls, vec![event], opts).await?; + Ok(event_id) + } + + /// Send event to a specific relays + pub async fn batch_event_to( + &self, + urls: I, + events: Vec, + opts: RelaySendOptions, + ) -> Result<(), Error> + where + I: IntoIterator, + U: TryIntoUrl, + Error: From<::Err>, + { + // Compose URLs + let urls: HashSet = urls + .into_iter() + .map(|u| u.try_into_url()) + .collect::>()?; + + // Check if urls set isn't empty + if urls.is_empty() { + return Err(Error::NoRelaysSpecified); } // Save events into database @@ -701,66 +700,62 @@ impl RelayPool { self.database.save_event(event).await?; } - let sent_to_at_least_one_relay: Arc = Arc::new(AtomicBool::new(false)); - let mut handles = Vec::new(); + // Get relays + let relays: HashMap = self.relays().await; - for (url, relay) in relays.into_iter() { - let len = events.len(); - let events = events.clone(); - let sent = sent_to_at_least_one_relay.clone(); - let handle = thread::spawn(async move { - match relay.batch_event(events, opts).await { - Ok(_) => { - let _ = - sent.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); - } - Err(e) => tracing::error!("Impossible to send {len} events to {url}: {e}"), - } - }); - handles.push(handle); + if relays.is_empty() { + return Err(Error::NoRelays); } - for handle in handles.into_iter().flatten() { - handle.join().await?; - } + // If passed only 1 url, not use threads + if urls.len() == 1 { + let url: Url = urls.into_iter().next().ok_or(Error::RelayNotFound)?; + let relay: &Relay = relays.get(&url).ok_or(Error::RelayNotFound)?; + relay.batch_event(events, opts).await?; + } else { + // Check if urls set contains ONLY already added relays + if !urls.iter().all(|url| relays.contains_key(url)) { + return Err(Error::RelayNotFound); + } - if !sent_to_at_least_one_relay.load(Ordering::SeqCst) { - return Err(Error::EventsNotPublished); - } + let sent_to_at_least_one_relay: Arc = Arc::new(AtomicBool::new(false)); + let mut handles = Vec::with_capacity(urls.len()); - Ok(()) - } + for (url, relay) in relays.into_iter().filter(|(url, ..)| urls.contains(url)) { + let events = events.clone(); + let sent = sent_to_at_least_one_relay.clone(); + let handle = thread::spawn(async move { + match relay.batch_event(events, opts).await { + Ok(_) => { + sent.store(true, Ordering::SeqCst); + } + Err(e) => tracing::error!("Impossible to send event to {url}: {e}"), + } + }); + handles.push(handle); + } - /// Send event to a single relay - pub async fn send_event_to( - &self, - url: U, - event: Event, - opts: RelaySendOptions, - ) -> Result - where - U: TryIntoUrl, - Error: From<::Err>, - { - let url: Url = url.try_into_url()?; - self.database.save_event(&event).await?; - let relays = self.relays().await; - if let Some(relay) = relays.get(&url) { - Ok(relay.send_event(event, opts).await?) - } else { - Err(Error::RelayNotFound) + for handle in handles.into_iter().flatten() { + handle.join().await?; + } + + if !sent_to_at_least_one_relay.load(Ordering::SeqCst) { + return Err(Error::EventNotPublished); + } } + + Ok(()) } /// Subscribe to filters /// /// Internal Subscription ID set to `InternalSubscriptionId::Pool` - pub async fn subscribe(&self, filters: Vec, wait: Option) { + pub async fn subscribe(&self, filters: Vec, opts: RelaySendOptions) { let relays = self.relays().await; self.update_subscription_filters(filters.clone()).await; for relay in relays.values() { if let Err(e) = relay - .subscribe_with_internal_id(InternalSubscriptionId::Pool, filters.clone(), wait) + .subscribe_with_internal_id(InternalSubscriptionId::Pool, filters.clone(), opts) .await { tracing::error!("{e}"); @@ -771,11 +766,11 @@ impl RelayPool { /// Unsubscribe from filters /// /// Internal Subscription ID set to `InternalSubscriptionId::Pool` - pub async fn unsubscribe(&self, wait: Option) { + pub async fn unsubscribe(&self, opts: RelaySendOptions) { let relays = self.relays().await; for relay in relays.values() { if let Err(e) = relay - .unsubscribe_with_internal_id(InternalSubscriptionId::Pool, wait) + .unsubscribe_with_internal_id(InternalSubscriptionId::Pool, opts) .await { tracing::error!("{e}"); @@ -785,56 +780,96 @@ impl RelayPool { /// Get events of filters /// - /// Get events from local database and relays + /// Get events both from **local database** and **relays** pub async fn get_events_of( &self, filters: Vec, timeout: Duration, opts: FilterOptions, ) -> Result, Error> { - // Get stored events - let stored_events: Vec = self - .database - .query(filters.clone(), Order::Desc) + let relays = self.relays().await; + self.get_events_from(relays.into_keys(), filters, timeout, opts) .await - .unwrap_or_default(); + } - // Compose IDs and Events collections - let ids: Arc>> = - Arc::new(Mutex::new(stored_events.iter().map(|e| e.id()).collect())); - let events: Arc>> = Arc::new(Mutex::new(stored_events)); + /// Get events of filters from specific relays + /// + /// Get events both from **local database** and **relays** + /// + /// If no relay is specified, will be queried only the database. + pub async fn get_events_from( + &self, + urls: I, + filters: Vec, + timeout: Duration, + opts: FilterOptions, + ) -> Result, Error> + where + I: IntoIterator, + U: TryIntoUrl, + Error: From<::Err>, + { + let urls: HashSet = urls + .into_iter() + .map(|u| u.try_into_url()) + .collect::>()?; + + if urls.is_empty() { + Ok(self.database.query(filters, Order::Desc).await?) + } else if urls.len() == 1 { + let url: Url = urls.into_iter().next().ok_or(Error::RelayNotFound)?; + let relay: Relay = self.internal_relay(&url).await?; + Ok(relay.get_events_of(filters, timeout, opts).await?) + } else { + let relays: HashMap = self.relays().await; - // Get relays and start query - let mut handles = Vec::new(); - let relays = self.relays().await; - for (url, relay) in relays.into_iter() { - let filters = filters.clone(); - let ids = ids.clone(); - let events = events.clone(); - let handle = thread::spawn(async move { - if let Err(e) = relay - .get_events_of_with_callback(filters, timeout, opts, |event| async { - let mut ids = ids.lock().await; - if !ids.contains(&event.id()) { - let mut events = events.lock().await; - ids.insert(event.id()); - events.push(event); - } - }) - .await - { - tracing::error!("Failed to get events from {url}: {e}"); - } - }); - handles.push(handle); - } + // Check if urls set contains ONLY already added relays + if !urls.iter().all(|url| relays.contains_key(url)) { + return Err(Error::RelayNotFound); + } - // Join threads - for handle in handles.into_iter().flatten() { - handle.join().await?; - } + let stored_events: Vec = self + .database + .query(filters.clone(), Order::Desc) + .await + .unwrap_or_default(); + + // Compose IDs and Events collections + let ids: Arc>> = + Arc::new(Mutex::new(stored_events.iter().map(|e| e.id()).collect())); + let events: Arc>> = Arc::new(Mutex::new(stored_events)); + + // Filter relays and start query + let mut handles = Vec::with_capacity(urls.len()); + for (url, relay) in relays.into_iter().filter(|(url, ..)| urls.contains(url)) { + let filters = filters.clone(); + let ids = ids.clone(); + let events = events.clone(); + let handle = thread::spawn(async move { + if let Err(e) = relay + .get_events_of_with_callback(filters, timeout, opts, |event| async { + let mut ids = ids.lock().await; + if !ids.contains(&event.id()) { + let mut events = events.lock().await; + ids.insert(event.id()); + events.push(event); + } + }) + .await + { + tracing::error!("Failed to get events from {url}: {e}"); + } + }); + handles.push(handle); + } - Ok(events.lock_owned().await.clone()) + // Join threads + for handle in handles.into_iter().flatten() { + handle.join().await?; + } + + Ok(events.lock_owned().await.clone()) + } } /// Request events of filter. @@ -853,6 +888,33 @@ impl RelayPool { } } + /// Request events of filter from specific relays. + /// + /// If the events aren't already stored in the database, will be sent to notification listener + /// until the EOSE "end of stored events" message is received from the relay. + pub async fn req_events_from( + &self, + urls: I, + filters: Vec, + timeout: Duration, + opts: FilterOptions, + ) -> Result<(), Error> + where + I: IntoIterator, + U: TryIntoUrl, + Error: From<::Err>, + { + let urls: HashSet = urls + .into_iter() + .map(|u| u.try_into_url()) + .collect::>()?; + let relays: HashMap = self.relays().await; + for (_, relay) in relays.into_iter().filter(|(url, ..)| urls.contains(url)) { + relay.req_events_of(filters.clone(), timeout, opts); + } + Ok(()) + } + /// Connect to all added relays and keep connection alive pub async fn connect(&self, connection_timeout: Option) { let relays: HashMap = self.relays().await; diff --git a/crates/nostr-sdk/src/relay/stats.rs b/crates/nostr-sdk/src/relay/stats.rs index ca0373414..87374c3e5 100644 --- a/crates/nostr-sdk/src/relay/stats.rs +++ b/crates/nostr-sdk/src/relay/stats.rs @@ -72,16 +72,12 @@ impl PingStats { *sent_at = Instant::now(); } - pub(crate) fn set_last_nonce(&self, nonce: u64) -> bool { - self.last_nonce - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(nonce)) - .is_ok() + pub(crate) fn set_last_nonce(&self, nonce: u64) { + self.last_nonce.store(nonce, Ordering::SeqCst) } - pub(crate) fn set_replied(&self, replied: bool) -> bool { - self.replied - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(replied)) - .is_ok() + pub(crate) fn set_replied(&self, replied: bool) { + self.replied.store(replied, Ordering::SeqCst); } } @@ -187,16 +183,10 @@ impl RelayConnectionStats { let now: u64 = Timestamp::now().as_u64(); - let _ = self - .connected_at - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(now)); + self.connected_at.store(now, Ordering::SeqCst); if self.first_connection_timestamp() == Timestamp::from(0) { - let _ = self.first_connection_timestamp.fetch_update( - Ordering::SeqCst, - Ordering::SeqCst, - |_| Some(now), - ); + self.first_connection_timestamp.store(now, Ordering::SeqCst); } }