diff --git a/crates/nostr-sdk-pool/src/relay.rs b/crates/nostr-sdk-pool/src/relay.rs index 17cecd4ba..30e96c58d 100644 --- a/crates/nostr-sdk-pool/src/relay.rs +++ b/crates/nostr-sdk-pool/src/relay.rs @@ -60,6 +60,9 @@ pub enum Error { /// Database error #[error(transparent)] Database(#[from] DatabaseError), + /// Thread error + #[error(transparent)] + Thread(#[from] thread::Error), /// Message response timeout #[error("recv message response timeout")] RecvTimeout, @@ -984,6 +987,111 @@ impl Relay { } } + /// Send `REQ` to relay + /// + /// If `close_on` is `Some(..)`, the `REQ` will be automatically closed (depending on [FilterOptions]). + pub async fn send_req( + &self, + id: SubscriptionId, + filters: Vec, + close_on: Option, + ) -> Result<(), Error> { + self.send_msg( + ClientMessage::req(id.clone(), filters), + RelaySendOptions::new().skip_send_confirmation(false), + ) + .await?; + + if let Some(opts) = close_on { + let relay = self.clone(); + thread::spawn(async move { + let mut counter = 0; + let mut received_eose: bool = false; + + let mut notifications = relay.notification_sender.subscribe(); + while let Ok(notification) = notifications.recv().await { + match notification { + RelayPoolNotification::Message { message, .. } => match message { + RelayMessage::Event { + subscription_id, .. + } => { + if subscription_id.eq(&id) { + if let FilterOptions::WaitForEventsAfterEOSE(num) = opts { + if received_eose { + counter += 1; + if counter >= num { + break; + } + } + } + } + } + RelayMessage::EndOfStoredEvents(subscription_id) => { + if subscription_id.eq(&id) { + tracing::debug!( + "Received EOSE for subscription {id} from {}", + relay.url + ); + received_eose = true; + if let FilterOptions::ExitOnEOSE + | FilterOptions::WaitDurationAfterEOSE(_) = opts + { + break; + } + } + } + _ => (), + }, + RelayPoolNotification::RelayStatus { relay_url, status } => { + if relay_url == relay.url && status.is_disconnected() { + return Ok(()); // No need to send CLOSE msg + } + } + RelayPoolNotification::Stop | RelayPoolNotification::Shutdown => { + return Ok(()); // No need to send CLOSE msg + } + _ => (), + } + } + + if let FilterOptions::WaitDurationAfterEOSE(duration) = opts { + time::timeout(Some(duration), async { + while let Ok(notification) = notifications.recv().await { + match notification { + RelayPoolNotification::RelayStatus { relay_url, status } => { + if relay_url == relay.url && status.is_disconnected() { + return Ok(()); // No need to send CLOSE msg + } + } + RelayPoolNotification::Stop | RelayPoolNotification::Shutdown => { + return Ok(()); // No need to send CLOSE msg + } + _ => (), + } + } + + Ok::<(), Error>(()) + }) + .await; + } + + // Unsubscribe + relay + .send_msg( + ClientMessage::close(id.clone()), + RelaySendOptions::default(), + ) + .await?; + + tracing::debug!("Subscription {id} auto-closed"); + + Ok::<(), Error>(()) + })?; + } + + Ok(()) + } + /// Send event and wait for `OK` relay msg pub async fn send_event(&self, event: Event, opts: RelaySendOptions) -> Result { let id: EventId = event.id(); @@ -1282,7 +1390,7 @@ impl Relay { } } RelayPoolNotification::RelayStatus { relay_url, status } => { - if relay_url == self.url && status != RelayStatus::Connected { + if relay_url == self.url && status.is_disconnected() { return Err(Error::NotConnected); } } @@ -1315,16 +1423,10 @@ impl Relay { } let id = SubscriptionId::generate(); - let send_opts = RelaySendOptions::default().skip_send_confirmation(true); - self.send_msg(ClientMessage::req(id.clone(), filters), send_opts) - .await?; + self.send_req(id.clone(), filters, Some(opts)).await?; - self.handle_events_of(id.clone(), timeout, opts, callback) - .await?; - - // Unsubscribe - self.send_msg(ClientMessage::close(id), send_opts).await?; + self.handle_events_of(id, timeout, opts, callback).await?; Ok(()) } diff --git a/crates/nostr-sdk/src/client/zapper.rs b/crates/nostr-sdk/src/client/zapper.rs index 375ed9250..0ec99f795 100644 --- a/crates/nostr-sdk/src/client/zapper.rs +++ b/crates/nostr-sdk/src/client/zapper.rs @@ -12,7 +12,7 @@ use async_utility::time; use lnurl_pay::api::Lud06OrLud16; use lnurl_pay::{LightningAddress, LnUrl}; use nostr::prelude::*; -use nostr_sdk_pool::{RelayPoolNotification, RelaySendOptions}; +use nostr_sdk_pool::{FilterOptions, RelayPoolNotification}; #[cfg(all(feature = "webln", target_arch = "wasm32"))] use webln::WebLN; @@ -235,9 +235,10 @@ impl Client { // Subscribe relay - .send_msg( - ClientMessage::req(id.clone(), vec![filter]), - RelaySendOptions::new().skip_send_confirmation(false), + .send_req( + id, + vec![filter], + Some(FilterOptions::WaitForEventsAfterEOSE(1)), ) .await?; @@ -265,10 +266,6 @@ impl Client { }) .await .ok_or(Error::Timeout)??; - - // Unsubscribe - self.send_msg_to([uri.relay_url.clone()], ClientMessage::close(id)) - .await?; } Ok(())