From 084bd54cd782a62108ef921e8c585cf67abfa80f Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Mon, 9 Dec 2024 15:22:33 -0800 Subject: [PATCH] WIP: rough subscription manager API proposal --- Cargo.lock | 4 + crates/notedeck/Cargo.toml | 4 + crates/notedeck/src/lib.rs | 5 + crates/notedeck/src/submgr.rs | 371 ++++++++++++++++++++++++++ crates/notedeck/src/util/mod.rs | 4 + crates/notedeck/src/util/test_util.rs | 86 ++++++ 6 files changed, 474 insertions(+) create mode 100644 crates/notedeck/src/submgr.rs create mode 100644 crates/notedeck/src/util/mod.rs create mode 100644 crates/notedeck/src/util/test_util.rs diff --git a/Cargo.lock b/Cargo.lock index 133136c5..cc774e24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2530,18 +2530,22 @@ dependencies = [ "dirs", "egui", "enostr", + "futures", "hex", "image", + "nostr", "nostrdb", "poll-promise", "puffin 0.19.1 (git+https://github.com/jb55/puffin?rev=70ff86d5503815219b01a009afd3669b7903a057)", "security-framework", "serde", "serde_json", + "sha2", "strum", "strum_macros", "tempfile", "thiserror 2.0.7", + "tokio", "tracing", "url", "uuid", diff --git a/crates/notedeck/Cargo.toml b/crates/notedeck/Cargo.toml index 31bdfba2..3e2949cd 100644 --- a/crates/notedeck/Cargo.toml +++ b/crates/notedeck/Cargo.toml @@ -22,9 +22,13 @@ serde = { workspace = true } hex = { workspace = true } thiserror = { workspace = true } puffin = { workspace = true, optional = true } +futures = "0.3.31" [dev-dependencies] tempfile = { workspace = true } +tokio = { workspace = true } +nostr = { workspace = true } +sha2 = "0.10.8" [target.'cfg(target_os = "macos")'.dependencies] security-framework = { workspace = true } diff --git a/crates/notedeck/src/lib.rs b/crates/notedeck/src/lib.rs index 50ad2c7d..0ad7bf4b 100644 --- a/crates/notedeck/src/lib.rs +++ b/crates/notedeck/src/lib.rs @@ -12,6 +12,7 @@ mod notecache; mod result; pub mod storage; mod style; +pub mod submgr; pub mod theme; mod theme_handler; mod time; @@ -20,6 +21,10 @@ pub mod ui; mod unknowns; mod user_account; +/// Various utilities +#[macro_use] +pub mod util; + pub use accounts::{AccountData, Accounts, AccountsAction, AddAccountAction}; pub use app::App; pub use args::Args; diff --git a/crates/notedeck/src/submgr.rs b/crates/notedeck/src/submgr.rs new file mode 100644 index 00000000..b43fce61 --- /dev/null +++ b/crates/notedeck/src/submgr.rs @@ -0,0 +1,371 @@ +#![allow(unused)] + +use futures::StreamExt; +use std::cmp::Ordering; +use std::collections::BTreeMap; +use std::error::Error; +use std::fmt; +use thiserror::Error; + +use enostr::Filter; +use nostrdb::{self, Config, Ndb, NoteKey, Subscription, SubscriptionStream}; + +/// The Subscription Manager +/// +/// NOTE - This interface wishes it was called Subscriptions but there +/// already is one. Using a lame (but short) placeholder name instead +/// for now ... +/// +/// ```no_run +/// use std::error::Error; +/// +/// use nostrdb::{Config, Ndb}; +/// use enostr::Filter; +/// use notedeck::submgr::{SubConstraint, SubMgr, SubSpecBuilder, SubError}; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let mut ndb = Ndb::new("the/db/path/", &Config::new())?; +/// let mut submgr = SubMgr::new(ndb.clone()); +/// +/// // Define a filter and build the subscription specification +/// let filter = Filter::new().kinds(vec![1, 2, 3]).build(); +/// let spec = SubSpecBuilder::new() +/// .filters(vec![filter]) +/// .constraint(SubConstraint::Local) +/// .build(); +/// +/// // Subscribe and obtain a SubReceiver +/// let mut receiver = submgr.subscribe(spec)?; +/// +/// // Process incoming note keys +/// loop { +/// match receiver.next().await { +/// Ok(note_keys) => { +/// // Process the note keys +/// println!("Received note keys: {:?}", note_keys); +/// }, +/// Err(SubError::StreamEnded) => { +/// // Not really an error; we should clean up +/// break; +/// }, +/// Err(err) => { +/// // Handle other errors +/// eprintln!("Error: {:?}", err); +/// break; +/// }, +/// } +/// } +/// +/// // Unsubscribe when the subscription is no longer needed +/// submgr.unsubscribe(&receiver)?; +/// +/// Ok(()) +/// } +/// ``` + +#[derive(Debug, Error)] +pub enum SubError { + #[error("Stream ended")] + StreamEnded, + + #[error("Internal error: {0}")] + InternalError(String), + + #[error("nostrdb error: {0}")] + NdbError(#[from] nostrdb::Error), +} + +pub type SubResult = Result; + +#[derive(Debug, Clone, Copy)] +pub struct SubId(nostrdb::Subscription); + +impl From for SubId { + fn from(subscription: Subscription) -> Self { + SubId(subscription) + } +} + +impl Ord for SubId { + fn cmp(&self, other: &Self) -> Ordering { + self.0.id().cmp(&other.0.id()) + } +} + +impl PartialOrd for SubId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for SubId { + fn eq(&self, other: &Self) -> bool { + self.0.id() == other.0.id() + } +} + +impl Eq for SubId {} + +#[derive(Debug, Clone)] +pub enum SubConstraint { + OneShot, // terminate subscription after initial query + Local, // only query the local db, no remote subs + OutboxRelays(Vec), // ensure one of these is in the active relay set + AllowedRelays(Vec), // if not empty, only use these relays + BlockedRelays(Vec), // if not empty, don't use these relays +} + +#[derive(Debug, Default)] +pub struct SubSpecBuilder { + rmtid: Option, + filters: Vec, + constraints: Vec, +} + +impl SubSpecBuilder { + pub fn new() -> Self { + SubSpecBuilder::default() + } + pub fn rmtid(mut self, id: String) -> Self { + self.rmtid = Some(id); + self + } + pub fn filters(mut self, filters: Vec) -> Self { + self.filters.extend(filters); + self + } + pub fn constraint(mut self, constraint: SubConstraint) -> Self { + self.constraints.push(constraint); + self + } + pub fn build(self) -> SubSpec { + let mut outbox_relays = Vec::new(); + let mut allowed_relays = Vec::new(); + let mut blocked_relays = Vec::new(); + let mut is_oneshot = false; + let mut is_local = false; + + for constraint in self.constraints { + match constraint { + SubConstraint::OneShot => is_oneshot = true, + SubConstraint::Local => is_local = true, + SubConstraint::OutboxRelays(relays) => outbox_relays.extend(relays), + SubConstraint::AllowedRelays(relays) => allowed_relays.extend(relays), + SubConstraint::BlockedRelays(relays) => blocked_relays.extend(relays), + } + } + + SubSpec { + rmtid: self.rmtid, + filters: self.filters, + outbox_relays, + allowed_relays, + blocked_relays, + is_oneshot, + is_local, + } + } +} + +#[derive(Debug, Clone)] +pub struct SubSpec { + rmtid: Option, + filters: Vec, + outbox_relays: Vec, + allowed_relays: Vec, + blocked_relays: Vec, + is_oneshot: bool, + is_local: bool, +} + +pub struct SubMgr { + ndb: Ndb, + subs: BTreeMap, +} + +impl SubMgr { + pub fn new(ndb: Ndb) -> Self { + SubMgr { + ndb, + subs: BTreeMap::new(), + } + } + + pub fn subscribe(&mut self, spec: SubSpec) -> SubResult { + let receiver = self.make_subscription(&spec)?; + self.subs.insert(receiver.id, spec); + Ok(receiver) + } + + pub fn unsubscribe(&mut self, rcvr: &SubReceiver) -> SubResult<()> { + self.subs.remove(&rcvr.id); + Ok(()) + } + + fn make_subscription(&mut self, sub: &SubSpec) -> SubResult { + let subscription = self.ndb.subscribe(&sub.filters)?; + let mut stream = subscription.stream(&self.ndb).notes_per_await(1); + Ok(SubReceiver::new( + self.ndb.clone(), + subscription.into(), + stream, + )) + } +} + +pub struct SubReceiver { + ndb: Ndb, // if the streams's ndb was accessible we could use that instead + id: SubId, + stream: SubscriptionStream, +} + +impl SubReceiver { + pub fn new(ndb: Ndb, id: SubId, stream: SubscriptionStream) -> Self { + SubReceiver { ndb, id, stream } + } + + pub async fn next(&mut self) -> SubResult> { + self.stream.next().await.ok_or(SubError::StreamEnded) + } + + pub fn poll(&mut self, max_notes: u32) -> Vec { + self.ndb.poll_for_notes(self.id.0, max_notes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::testdbs_path_async; + use crate::util::test_util::{raw_msg, test_keypair, ManagedNdb}; + use nostrdb::Transaction; + + // test basic subscription functionality + #[tokio::test] + async fn test_submgr_sub() -> Result<(), Box> { + // setup an ndb and submgr to test + let (mndb, mut ndb) = ManagedNdb::setup(&testdbs_path_async!()); + let mut submgr = SubMgr::new(ndb.clone()); + + // subscribe to some stuff + let mut receiver = submgr.subscribe( + SubSpecBuilder::new() + .filters(vec![Filter::new().kinds(vec![1]).build()]) + .constraint(SubConstraint::Local) + .build(), + )?; + + // nothing should be available yet + assert_eq!(receiver.poll(1), vec![]); + + // process a test event that matches the subscription + let keys1 = test_keypair(1); + let kind = 1; + let content = "abc"; + ndb.process_event(&raw_msg("subid", &keys1, kind, content))?; + + // receiver should now see the msg + let nks = receiver.next().await?; + assert_eq!(nks.len(), 1); + let txn = Transaction::new(&ndb)?; + let note = ndb.get_note_by_key(&txn, nks[0])?; + assert_eq!(note.pubkey(), keys1.pubkey.bytes()); + assert_eq!(note.kind(), kind); + assert_eq!(note.content(), content); + + // now nothing should be available again + assert_eq!(receiver.poll(1), vec![]); + + submgr.unsubscribe(&receiver)?; + Ok(()) + } + + // ensure that the subscription works when it is waiting before the event + #[tokio::test] + async fn test_submgr_sub_with_waiting_thread() -> Result<(), Box> { + // setup an ndb and submgr to test + let (mndb, mut ndb) = ManagedNdb::setup(&testdbs_path_async!()); + let mut submgr = SubMgr::new(ndb.clone()); + + // subscribe to some stuff + let mut receiver = submgr.subscribe( + SubSpecBuilder::new() + .filters(vec![Filter::new().kinds(vec![1]).build()]) + .constraint(SubConstraint::Local) + .build(), + )?; + + // spawn a task to wait for the next message + let handle = tokio::spawn(async move { + let nks = receiver.next().await.unwrap(); + assert_eq!(nks.len(), 1); // Ensure one message is received + (receiver, nks) // return the receiver as well + }); + + // process a test event that matches the subscription + let keys1 = test_keypair(1); + let kind = 1; + let content = "abc"; + ndb.process_event(&raw_msg("subid", &keys1, kind, content))?; + + // await the spawned task to ensure it completes + let (mut receiver, nks) = handle.await?; + + // validate the received message + let txn = Transaction::new(&ndb)?; + let note = ndb.get_note_by_key(&txn, nks[0])?; + assert_eq!(note.pubkey(), keys1.pubkey.bytes()); + assert_eq!(note.kind(), kind); + assert_eq!(note.content(), content); + + // ensure no additional messages are available + assert_eq!(receiver.poll(1), vec![]); + + submgr.unsubscribe(&receiver)?; + Ok(()) + } + + // test subscription poll and next interaction + #[tokio::test] + async fn test_submgr_poll_and_next() -> Result<(), Box> { + // setup an ndb and submgr to test + let (mndb, mut ndb) = ManagedNdb::setup(&testdbs_path_async!()); + let mut submgr = SubMgr::new(ndb.clone()); + + // subscribe to some stuff + let mut receiver = submgr.subscribe( + SubSpecBuilder::new() + .filters(vec![Filter::new().kinds(vec![1]).build()]) + .constraint(SubConstraint::Local) + .build(), + )?; + + // nothing should be available yet + assert_eq!(receiver.poll(1), vec![]); + + // process a test event that matches the subscription + let keys1 = test_keypair(1); + let kind = 1; + let content = "abc"; + ndb.process_event(&raw_msg("subid", &keys1, kind, content))?; + std::thread::sleep(std::time::Duration::from_millis(150)); + + // now poll should consume the note + assert_eq!(receiver.poll(1), vec![NoteKey::new(1)]); + + // nothing more available + assert_eq!(receiver.poll(1), vec![]); + + // process a second event + let content = "def"; + ndb.process_event(&raw_msg("subid", &keys1, kind, content))?; + + // now receiver should now see the second note + assert_eq!(receiver.next().await?, vec![NoteKey::new(2)]); + + submgr.unsubscribe(&receiver)?; + Ok(()) + } +} diff --git a/crates/notedeck/src/util/mod.rs b/crates/notedeck/src/util/mod.rs new file mode 100644 index 00000000..0964f7ae --- /dev/null +++ b/crates/notedeck/src/util/mod.rs @@ -0,0 +1,4 @@ +#[allow(missing_docs)] +#[cfg(test)] +#[macro_use] +pub mod test_util; diff --git a/crates/notedeck/src/util/test_util.rs b/crates/notedeck/src/util/test_util.rs new file mode 100644 index 00000000..aaaa6a78 --- /dev/null +++ b/crates/notedeck/src/util/test_util.rs @@ -0,0 +1,86 @@ +use enostr::{FullKeypair, Pubkey}; +use nostrdb::{Config, Ndb, NoteBuilder}; +use std::fs; +use std::path::Path; + +// FIXME - make nostrdb::test_util::cleanup_db accessible instead +#[allow(dead_code)] +fn cleanup_db(path: &str) { + let p = Path::new(path); + let _ = fs::remove_file(p.join("data.mdb")); + let _ = fs::remove_file(p.join("lock.mdb")); +} + +// managed ndb handle that cleans up test data when dropped +pub struct ManagedNdb { + pub path: String, + pub ndb: Ndb, +} +impl ManagedNdb { + pub fn setup(path: &str) -> (Self, Ndb) { + cleanup_db(path); // ensure a clean slate before starting + let ndb = Ndb::new(path, &Config::new()) + .unwrap_or_else(|err| panic!("Failed to create Ndb at {}: {}", path, err)); + ( + Self { + path: path.to_string(), + ndb: ndb.clone(), + }, + ndb, + ) + } +} +impl Drop for ManagedNdb { + fn drop(&mut self) { + cleanup_db(&self.path); // comment this out to leave the db for inspection + } +} + +// generate a testdbs_path for an async test automatically +#[macro_export] +macro_rules! testdbs_path_async { + () => {{ + fn f() {} + fn type_name_of(_: T) -> &'static str { + core::any::type_name::() + } + let name = type_name_of(f); + + // Find and cut the rest of the path + let test_name = match &name[..name.len() - 3].strip_suffix("::{{closure}}") { + Some(stripped) => match &stripped.rfind(':') { + Some(pos) => &stripped[pos + 1..stripped.len()], + None => &stripped, + }, + None => &name[..name.len() - 3], + }; + + format!("target/testdbs/{}", test_name) + }}; +} + +// generate a deterministic keypair for testing +pub fn test_keypair(input: u64) -> FullKeypair { + use sha2::{Digest, Sha256}; + + let mut hasher = Sha256::new(); + hasher.update(input.to_le_bytes()); + let hash = hasher.finalize(); + + let secret_key = nostr::SecretKey::from_slice(&hash).expect("valid secret key"); + let (xopk, _) = secret_key.x_only_public_key(&nostr::SECP256K1); + let pubkey = Pubkey::new(xopk.serialize()); + + FullKeypair::new(pubkey, secret_key) +} + +// generate a basic raw message from scratch +pub fn raw_msg(subid: &str, keys: &FullKeypair, kind: u32, content: &str) -> String { + let note = NoteBuilder::new() + .kind(kind) + .content(content) + .sign(&keys.secret_key.to_secret_bytes()) + .build() + .expect("note"); + format!(r#"["EVENT", "{}", {}]"#, subid, note.json().expect("json")) +}