diff --git a/Cargo.toml b/Cargo.toml index c988ba5..6f6fb91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,11 +12,12 @@ default = ["console_error_panic_hook"] [dependencies] cfg-if = "0.1.2" -worker = { version = "0.0.18", features = ["queue"] } +worker = { version = "0.0.18", features = ["queue", "d1"] } futures = "0.3.26" futures-util = { version = "0.3", default-features = false } nostr = { version = "0.22.0", default-features = false, features = ["nip11"] } serde = { version = "^1.0", features = ["derive"] } +serde_json = "1.0.67" # The `console_error_panic_hook` crate provides better debugging of panics by # logging them with `console.error`. This is great for development, but requires diff --git a/migrations/0000_events.sql b/migrations/0000_events.sql new file mode 100644 index 0000000..ac38d0c --- /dev/null +++ b/migrations/0000_events.sql @@ -0,0 +1,38 @@ +-- Migration number: 0000 2023-08-10T16:43:44.275Z +-- Event Table +CREATE TABLE IF NOT EXISTS event ( + id BLOB PRIMARY KEY, -- A unique 4-byte hash representing the event. + created_at INTEGER NOT NULL, -- When the event was authored. + pubkey BLOB NOT NULL, -- Author's public key. + kind INTEGER NOT NULL, -- Event kind/type. + content TEXT NOT NULL, -- Serialized json of event object. + sig BLOB NOT NULL, -- Signature + deleted INTEGER NOT NULL DEFAULT 0 -- Soft delete flag, 0 = not deleted, 1 = deleted +); + +-- Tag Table +CREATE TABLE IF NOT EXISTS tag ( + id INTEGER PRIMARY KEY, -- Auto-incrementing ID for tags. + event_id BLOB NOT NULL, -- Link to the event that the tag is associated with. + name TEXT NOT NULL, -- The tag's name. + value TEXT, -- The tag's value. + FOREIGN KEY(event_id) REFERENCES event(id) ON DELETE CASCADE + UNIQUE(event_id, name, value) -- Ensure that the combination is unique +); + +-- Indexes for faster queries + +-- Index to speed up look-ups by the pubkey of an event. +CREATE INDEX IF NOT EXISTS pubkey_index ON event(pubkey); + +-- Index to speed up look-ups by the kind/type of an event. +CREATE INDEX IF NOT EXISTS kind_index ON event(kind); + +-- Index to link tags to events. +CREATE INDEX IF NOT EXISTS tag_event_index ON tag(event_id); + +-- Index to speed up look-ups by tag names. +CREATE INDEX IF NOT EXISTS tag_name_index ON tag(name); + +-- Index to speed up look-ups by tag values. +CREATE INDEX IF NOT EXISTS tag_value_index ON tag(value); diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..081760c --- /dev/null +++ b/src/db.rs @@ -0,0 +1,262 @@ +use std::collections::HashMap; + +use ::nostr::{Event, Kind, RelayMessage}; +use nostr::{EventId, Tag, TagKind, Timestamp}; +use serde::Deserialize; +use serde_json::Value; +use wasm_bindgen::prelude::JsValue; +use worker::D1Database; +use worker::*; + +#[derive(Debug, Deserialize)] +struct EventRow { + id: EventId, + pubkey: nostr::secp256k1::XOnlyPublicKey, + created_at: Timestamp, + kind: Kind, + tags: Option>, + content: String, + sig: nostr::secp256k1::schnorr::Signature, +} + +#[derive(Deserialize)] +struct TagRow { + event_id: EventId, + name: String, + value: String, +} + +pub async fn get_nwc_events(keys: &[String], kind: Kind, db: &D1Database) -> Result> { + // Determine the event kind + match kind { + Kind::WalletConnectResponse => (), + Kind::WalletConnectRequest => (), + _ => return Ok(vec![]), // skip other event types + }; + + console_log!("querying for ({keys:?}) and {}", kind.as_u32()); + + // Query for the events first, without the tags + let placeholders: String = keys.iter().map(|_| "?").collect::>().join(", "); + let query_str = format!( + r#" + SELECT * FROM event + WHERE pubkey IN ({}) AND kind = ? AND deleted = 0 + "#, + placeholders + ); + let mut stmt = db.prepare(&query_str); + let mut bindings = Vec::with_capacity(keys.len() + 1); // +1 for the kind afterwards + for key in keys.iter() { + bindings.push(JsValue::from_str(key)); + } + bindings.push(JsValue::from_f64(kind.as_u32() as f64)); + stmt = stmt.bind(&bindings)?; + + let result = stmt.all().await.map_err(|e| { + console_log!("Failed to fetch nwc events: {}", e); + format!("Failed to fetch nwc events: {}", e) + })?; + + let mut events: Vec = result + .results::()? + .iter() + .map(|row| { + let e: EventRow = serde_json::from_value(row.clone()).map_err(|e| { + console_log!("failed to parse event: {}", e); + worker::Error::from(format!( + "Failed to deserialize event from row ({}): {}", + row, e + )) + })?; + Ok(Event { + id: e.id, + pubkey: e.pubkey, + created_at: e.created_at, + kind: e.kind, + tags: e.tags.unwrap_or_default(), + content: e.content, + sig: e.sig, + }) + }) + .collect::>>()?; + + // Now get all the tags for all the events found + let event_ids: Vec = events.iter().map(|e| e.id).collect(); + let placeholders: String = event_ids.iter().map(|_| "?").collect::>().join(", "); + let tag_query_str = format!( + r#" + SELECT event_id, name, value FROM tag + WHERE event_id IN ({}) + ORDER BY id ASC + "#, + placeholders + ); + let mut tag_stmt = db.prepare(&tag_query_str); + let bindings: Vec = event_ids + .iter() + .map(|id| JsValue::from_str(&id.to_string())) + .collect(); + tag_stmt = tag_stmt.bind(&bindings)?; + + let tag_result = tag_stmt + .all() + .await + .map_err(|e| format!("Failed to fetch tags: {}", e))?; + + let tags: Vec = tag_result.results::()?; + let mut tags_map: HashMap> = HashMap::new(); + + for tag_row in tags { + if let Ok(tag) = Tag::parse(vec![tag_row.name, tag_row.value]) { + tags_map + .entry(tag_row.event_id) + .or_insert_with(Vec::new) + .push(tag); + } + } + + for event in &mut events { + if let Some(tags) = tags_map.remove(&event.id) { + event.tags.extend(tags); + } + } + + // Tag ordering could screw up signature, though it shouldn't matter + // for NWC messages because those should only have one tag. + // Also we insert tags in order and do an ORDER BY id so it should be fine. + events.retain(|event| match event.verify() { + Ok(_) => true, + Err(e) => { + console_log!("Verification failed for event with id {}: {}", event.id, e); + false + } + }); + + Ok(events) +} + +pub async fn handle_nwc_event(event: Event, db: &D1Database) -> Result> { + // Create the main event insertion query. + let event_insert_query = worker::query!( + db, + r#" + INSERT OR IGNORE INTO event (id, created_at, pubkey, kind, content, sig) + VALUES (?, ?, ?, ?, ?, ?) + "#, + &event.id, + &event.created_at, + &event.pubkey, + &event.kind, + &event.content, + &event.sig + )?; + + // Create a vector of tag insertion queries. + let mut tag_insert_queries: Vec<_> = event + .tags + .iter() + .map(|tag| { + worker::query!( + db, + r#" + INSERT OR IGNORE INTO tag (event_id, name, value) + VALUES (?, ?, ?) + "#, + &event.id, + &tag.kind().to_string(), + &tag.as_vec().get(1) + ) + .expect("should compile query") + }) + .collect(); + + // Combine the main event and tag insertion queries. + let mut batch_queries = vec![event_insert_query]; + batch_queries.append(&mut tag_insert_queries); + + // Run the batch queries. + let mut results = db.batch(batch_queries).await?.into_iter(); + + // Check the result of the main event insertion. + if let Some(error_msg) = results.next().and_then(|res| res.error()) { + console_log!("error saving nwc event to event table: {}", error_msg); + let relay_msg = RelayMessage::new_ok(event.id, false, &error_msg); + return Ok(Some(relay_msg)); + } + + // Check the results for the tag insertions. + for tag_insert_result in results { + if let Some(error_msg) = tag_insert_result.error() { + console_log!("error saving tag to tag table: {}", error_msg); + let relay_msg = RelayMessage::new_ok(event.id, false, &error_msg); + return Ok(Some(relay_msg)); + } + } + + let relay_msg = RelayMessage::new_ok(event.id, true, ""); + Ok(Some(relay_msg)) +} + +/// When a NWC request has been fulfilled, soft delete the request from the database +pub async fn delete_nwc_request(event: Event, db: &D1Database) -> Result<()> { + // Filter only relevant events + match event.kind { + Kind::WalletConnectResponse => (), + _ => return Ok(()), // skip other event types + }; + + let p_tag = event.tags.iter().find(|t| t.kind() == TagKind::P).cloned(); + let e_tag = event.tags.iter().find(|t| t.kind() == TagKind::E).cloned(); + + if let Some(Tag::PubKey(pubkey, ..)) = p_tag { + if let Some(Tag::Event(event_id, ..)) = e_tag { + // Soft delete the event based on pubkey and event_id + match worker::query!( + db, + "UPDATE event SET deleted = 1 WHERE pubkey = ? AND id = ?", + &pubkey.to_string(), + &event_id + )? + .run() + .await + { + Ok(_) => (), + Err(e) => { + console_log!("error soft deleting nwc event from database: {e}"); + return Ok(()); + } + } + } + } + + Ok(()) +} + +/// When a NWC response has been fulfilled, soft delete the response from the database +pub async fn delete_nwc_response(event: &Event, db: &D1Database) -> Result<()> { + // Filter only relevant events + match event.kind { + Kind::WalletConnectResponse => (), + _ => return Ok(()), // skip other event types + }; + + // Soft delete the event based on pubkey and id + match worker::query!( + db, + "UPDATE event SET deleted = 1 WHERE pubkey = ? AND id = ?", + &event.pubkey.to_string(), + &event.id + )? + .run() + .await + { + Ok(_) => console_log!("soft deleted nwc response event: {}", event.id), + Err(e) => { + console_log!("error soft deleting nwc event from database: {e}"); + return Ok(()); + } + } + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 5951e28..de0b2dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,13 @@ -use crate::nostr::get_nip11_response; -use crate::nostr::NOSTR_QUEUE_10; -use crate::nostr::NOSTR_QUEUE_7; use crate::nostr::NOSTR_QUEUE_8; use crate::nostr::NOSTR_QUEUE_9; pub(crate) use crate::nostr::{ try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5, NOSTR_QUEUE_6, }; -use ::nostr::{ - ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId, Tag, TagKind, -}; +use crate::{db::delete_nwc_request, nostr::NOSTR_QUEUE_10}; +use crate::{db::get_nwc_events, nostr::NOSTR_QUEUE_7}; +use crate::{db::handle_nwc_event, nostr::get_nip11_response}; +use ::nostr::{ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId}; use futures::StreamExt; use futures_util::lock::Mutex; use serde::{Deserialize, Serialize}; @@ -19,6 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use worker::*; +mod db; mod error; mod nostr; mod utils; @@ -113,12 +112,14 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { return relay_response(relay_msg); }; + let db = ctx.d1("DB")?; + // if the event is a nostr wallet connect event, we // should save it and not send to other relays. if let Some(relay_msg) = - handle_nwc_event(*event.clone(), &ctx).await? + handle_nwc_event(*event.clone(), &db).await? { - if let Err(e) = delete_nwc_request(*event, &ctx).await { + if let Err(e) = delete_nwc_request(*event, &db).await { console_log!("failed to delete nwc request: {e}"); } return relay_response(relay_msg); @@ -274,10 +275,12 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { continue; }; + let db = ctx.d1("DB").expect("should have DB"); + // if the event is a nostr wallet connect event, we // should save it and not send to other relays. if let Some(response) = - handle_nwc_event(*event.clone(), &ctx) + handle_nwc_event(*event.clone(), &db) .await .expect("failed to handle nwc event") { @@ -285,7 +288,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { .send_with_str(&response.as_json()) .expect("failed to send response"); - if let Err(e) = delete_nwc_request(*event, &ctx).await { + if let Err(e) = delete_nwc_request(*event, &db).await { console_log!("failed to delete nwc request: {e}"); } @@ -382,7 +385,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { // set running thread to true running_thread.swap(true, Ordering::Relaxed); - let ctx_clone = ctx.clone(); + let db = ctx.d1("DB").expect("should have DB"); let sub_id = subscription_id.clone(); let server_clone = server.clone(); let master_clone = requested_filters.clone(); @@ -397,7 +400,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { sub_id.clone(), master.clone(), &server_clone, - &ctx_clone, + &db, ).await { Ok(new_event_ids) => { @@ -500,7 +503,7 @@ pub async fn handle_filter( subscription_id: SubscriptionId, filter: Filter, server: &WebSocket, - ctx: &RouteContext<()>, + db: &D1Database, ) -> Result> { let mut events = vec![]; // get all authors and pubkeys @@ -519,7 +522,7 @@ pub async fn handle_filter( .unwrap_or_default() .contains(&Kind::WalletConnectRequest) { - let mut found_events = get_nwc_events(&keys, Kind::WalletConnectRequest, ctx) + let mut found_events = get_nwc_events(&keys, Kind::WalletConnectRequest, db) .await .unwrap_or_default(); @@ -534,7 +537,7 @@ pub async fn handle_filter( .unwrap_or_default() .contains(&Kind::WalletConnectResponse) { - let mut found_events = get_nwc_events(&keys, Kind::WalletConnectResponse, ctx) + let mut found_events = get_nwc_events(&keys, Kind::WalletConnectResponse, db) .await .unwrap_or_default(); @@ -567,138 +570,6 @@ pub async fn handle_filter( Ok(sent_event_ids) } -pub async fn get_nwc_events( - keys: &[String], - kind: Kind, - ctx: &RouteContext<()>, -) -> Result> { - let kv_store = match kind { - Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?, - Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?, - _ => return Ok(vec![]), // skip other event types, todo, we may want to store info events as well - }; - - let mut events = vec![]; - for key in keys { - let nwc_events = kv_store - .get(key) - .json::>() - .await? - .unwrap_or_default(); - for nwc_event in nwc_events { - // delete responses since we don't care after sending - if kind == Kind::WalletConnectResponse { - if let Err(e) = delete_nwc_response(&nwc_event, ctx).await { - console_log!("failed to delete nwc response: {e}"); - } - } - events.push(nwc_event); - } - } - - Ok(events) -} - -pub async fn handle_nwc_event( - event: Event, - ctx: &RouteContext<()>, -) -> Result> { - let kv_store = match event.kind { - Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?, - Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?, - _ => return Ok(None), // skip other event types, todo, we may want to store info events as well - }; - - console_log!("got a wallet connect event: {}", event.id); - - let key = &event.pubkey.to_string(); - - let new_nwc_responses = match kv_store.get(key).json::>().await { - Ok(Some(mut current)) => { - current.push(event.clone()); - current - } - Ok(None) => vec![event.clone()], - Err(e) => { - console_log!("error getting nwc events from KV: {e}"); - let relay_msg = - RelayMessage::new_ok(event.id, false, "error: could not save published note"); - return Ok(Some(relay_msg)); - } - }; - - // save new vector of events - if let Err(e) = kv_store.put(key, new_nwc_responses)?.execute().await { - console_log!("error saving nwc: {e}"); - let relay_msg = - RelayMessage::new_ok(event.id, false, "error: could not save published note"); - return Ok(Some(relay_msg)); - } - console_log!("saved nwc event: {}", event.id); - - let relay_msg = RelayMessage::new_ok(event.id, true, ""); - Ok(Some(relay_msg)) -} - -/// When a NWC request has been fulfilled, delete the request from KV -pub async fn delete_nwc_request(event: Event, ctx: &RouteContext<()>) -> Result<()> { - let kv_store = match event.kind { - Kind::WalletConnectResponse => ctx.kv("NWC_REQUESTS")?, - _ => return Ok(()), // skip other event types - }; - - let p_tag = event.tags.iter().find(|t| t.kind() == TagKind::P).cloned(); - let e_tag = event.tags.iter().find(|t| t.kind() == TagKind::E).cloned(); - - if let Some(Tag::PubKey(pubkey, ..)) = p_tag { - if let Some(Tag::Event(event_id, ..)) = e_tag { - let key = &pubkey.to_string(); - match kv_store.get(key).json::>().await { - Ok(Some(current)) => { - let new_events: Vec = - current.into_iter().filter(|e| e.id != event_id).collect(); - - // save new vector of events - kv_store.put(key, new_events)?.execute().await?; - console_log!("deleted nwc request event: {}", event_id); - } - Ok(None) => return Ok(()), - Err(e) => { - console_log!("error getting nwc events from KV: {e}"); - return Ok(()); - } - }; - }; - }; - - Ok(()) -} - -pub async fn delete_nwc_response(event: &Event, ctx: &RouteContext<()>) -> Result<()> { - let kv_store = match event.kind { - Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?, - _ => return Ok(()), // skip other event types - }; - - let key = &event.pubkey.to_string(); - match kv_store.get(key).json::>().await { - Ok(Some(current)) => { - let new_events: Vec = current.into_iter().filter(|e| e.id != event.id).collect(); - - // save new vector of events - kv_store.put(key, new_events)?.execute().await?; - console_log!("deleted nwc response event: {}", event.id); - } - Ok(None) => return Ok(()), - Err(e) => { - console_log!("error getting nwc events from KV: {e}"); - return Ok(()); - } - }; - - Ok(()) -} - // Helper function to extend a vector without duplicates fn extend_without_duplicates(master: &mut Vec, new: &Vec) { for item in new { diff --git a/wrangler.toml b/wrangler.toml index c7f2a3e..e060212 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -29,6 +29,16 @@ kv_namespaces = [ { binding = "NWC_RESPONSES", id = "5b434d5eced84abaad1c9a44448ac71c", preview_id = "af27b55b58754562b4250dcd3682547b" }, ] +[[d1_databases]] +binding = "DB" +database_name = "blastr-db" +database_id = "82736a30-841d-4c24-87d8-788763dacb01" + +[[env.staging.d1_databases]] +binding = "DB" +database_name = "blastr-db-staging" +database_id = "ba4c227b-edf2-46a4-99fa-4b7bfa036800" + [env.staging.vars] WORKERS_RS_VERSION = "0.0.18" ENVIRONMENT = "staging"