From faa2cc2234f3c7ec8fbf16773198c7b2d03937d1 Mon Sep 17 00:00:00 2001 From: Tony Giorgio Date: Thu, 10 Aug 2023 08:12:58 -0500 Subject: [PATCH 1/3] Bump workers-rs to 0.0.18 --- Cargo.toml | 3 ++- wrangler.toml | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9e46f5b..c988ba5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ name = "blastr" version = "0.0.0" edition = "2018" +resolver = "2" [lib] crate-type = ["cdylib", "rlib"] @@ -11,7 +12,7 @@ default = ["console_error_panic_hook"] [dependencies] cfg-if = "0.1.2" -worker = { version = "0.0.13", features = ["queue"] } +worker = { version = "0.0.18", features = ["queue"] } futures = "0.3.26" futures-util = { version = "0.3", default-features = false } nostr = { version = "0.22.0", default-features = false, features = ["nip11"] } diff --git a/wrangler.toml b/wrangler.toml index 5001f13..c7f2a3e 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -30,11 +30,11 @@ kv_namespaces = [ ] [env.staging.vars] -WORKERS_RS_VERSION = "0.0.13" +WORKERS_RS_VERSION = "0.0.18" ENVIRONMENT = "staging" [vars] -WORKERS_RS_VERSION = "0.0.13" +WORKERS_RS_VERSION = "0.0.18" ENVIRONMENT = "production" # Replace with all the queues you created, if you named them different. @@ -172,4 +172,4 @@ ENVIRONMENT = "production" binding = "nostr-events-pub-10-b" [build] -command = "cargo install -q worker-build --version 0.0.9 && worker-build --release" +command = "cargo install -q worker-build --version 0.0.10 && worker-build --release" From 9be88467cea9c6a1a2d6da869be8eecba6ca2ff9 Mon Sep 17 00:00:00 2001 From: Tony Giorgio Date: Fri, 11 Aug 2023 11:18:08 -0500 Subject: [PATCH 2/3] Verify events --- src/lib.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 6832376..5951e28 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,6 +66,19 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { ClientMessage::Event(event) => { console_log!("got an event from client: {}", event.id); + match event.verify() { + Ok(()) => (), + Err(e) => { + console_log!("could not verify event {}: {}", event.id, e); + let relay_msg = RelayMessage::new_ok( + event.id, + false, + "invalid event", + ); + return relay_response(relay_msg); + } + } + // check if disallowed event kind if DISALLOWED_EVENT_KINDS.contains(&event.kind.as_u32()) { console_log!( @@ -204,6 +217,21 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { match client_msg { ClientMessage::Event(event) => { console_log!("got an event from client: {}", event.id); + match event.verify() { + Ok(()) => (), + Err(e) => { + console_log!("could not verify event {}: {}", event.id, e); + let relay_msg = RelayMessage::new_ok( + event.id, + false, + "disallowed event kind", + ); + server + .send_with_str(&relay_msg.as_json()) + .expect("failed to send response"); + continue; + } + } // check if disallowed event kind if DISALLOWED_EVENT_KINDS.contains(&event.kind.as_u32()) { From d21613707cb37eac9deaa1d62078e76bdfadb0d9 Mon Sep 17 00:00:00 2001 From: Tony Giorgio Date: Thu, 10 Aug 2023 22:30:31 -0500 Subject: [PATCH 3/3] Implement nostr event db --- Cargo.toml | 3 +- migrations/0000_events.sql | 15 +++ migrations/0001_tag.sql | 15 +++ src/db.rs | 262 +++++++++++++++++++++++++++++++++++++ src/lib.rs | 165 +++-------------------- wrangler.toml | 10 ++ 6 files changed, 322 insertions(+), 148 deletions(-) create mode 100644 migrations/0000_events.sql create mode 100644 migrations/0001_tag.sql create mode 100644 src/db.rs diff --git a/Cargo.toml b/Cargo.toml index c988ba5..6f6fb91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,11 +12,12 @@ default = ["console_error_panic_hook"] [dependencies] cfg-if = "0.1.2" -worker = { version = "0.0.18", features = ["queue"] } +worker = { version = "0.0.18", features = ["queue", "d1"] } futures = "0.3.26" futures-util = { version = "0.3", default-features = false } nostr = { version = "0.22.0", default-features = false, features = ["nip11"] } serde = { version = "^1.0", features = ["derive"] } +serde_json = "1.0.67" # The `console_error_panic_hook` crate provides better debugging of panics by # logging them with `console.error`. This is great for development, but requires diff --git a/migrations/0000_events.sql b/migrations/0000_events.sql new file mode 100644 index 0000000..2e66474 --- /dev/null +++ b/migrations/0000_events.sql @@ -0,0 +1,15 @@ +-- Migration number: 0000 2023-08-10T16:43:44.275Z +DROP TABLE IF EXISTS event; + +CREATE TABLE IF NOT EXISTS event ( + id BLOB PRIMARY KEY, + created_at INTEGER NOT NULL, + pubkey BLOB NOT NULL, + kind INTEGER NOT NULL, + content TEXT NOT NULL, + sig BLOB NOT NULL, + deleted INTEGER NOT NULL DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS pubkey_index ON event(pubkey); +CREATE INDEX IF NOT EXISTS kind_index ON event(kind); diff --git a/migrations/0001_tag.sql b/migrations/0001_tag.sql new file mode 100644 index 0000000..cb5c3b0 --- /dev/null +++ b/migrations/0001_tag.sql @@ -0,0 +1,15 @@ +-- Migration number: 0001 2023-08-11T20:22:11.351Z +DROP TABLE IF EXISTS tag; + +CREATE TABLE IF NOT EXISTS tag ( + id INTEGER PRIMARY KEY, + event_id BLOB NOT NULL, + name TEXT NOT NULL, + value TEXT, + FOREIGN KEY(event_id) REFERENCES event(id) ON DELETE CASCADE + UNIQUE(event_id, name, value) +); + +CREATE INDEX IF NOT EXISTS tag_event_index ON tag(event_id); +CREATE INDEX IF NOT EXISTS tag_name_index ON tag(name); +CREATE INDEX IF NOT EXISTS tag_value_index ON tag(value); diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..081760c --- /dev/null +++ b/src/db.rs @@ -0,0 +1,262 @@ +use std::collections::HashMap; + +use ::nostr::{Event, Kind, RelayMessage}; +use nostr::{EventId, Tag, TagKind, Timestamp}; +use serde::Deserialize; +use serde_json::Value; +use wasm_bindgen::prelude::JsValue; +use worker::D1Database; +use worker::*; + +#[derive(Debug, Deserialize)] +struct EventRow { + id: EventId, + pubkey: nostr::secp256k1::XOnlyPublicKey, + created_at: Timestamp, + kind: Kind, + tags: Option>, + content: String, + sig: nostr::secp256k1::schnorr::Signature, +} + +#[derive(Deserialize)] +struct TagRow { + event_id: EventId, + name: String, + value: String, +} + +pub async fn get_nwc_events(keys: &[String], kind: Kind, db: &D1Database) -> Result> { + // Determine the event kind + match kind { + Kind::WalletConnectResponse => (), + Kind::WalletConnectRequest => (), + _ => return Ok(vec![]), // skip other event types + }; + + console_log!("querying for ({keys:?}) and {}", kind.as_u32()); + + // Query for the events first, without the tags + let placeholders: String = keys.iter().map(|_| "?").collect::>().join(", "); + let query_str = format!( + r#" + SELECT * FROM event + WHERE pubkey IN ({}) AND kind = ? AND deleted = 0 + "#, + placeholders + ); + let mut stmt = db.prepare(&query_str); + let mut bindings = Vec::with_capacity(keys.len() + 1); // +1 for the kind afterwards + for key in keys.iter() { + bindings.push(JsValue::from_str(key)); + } + bindings.push(JsValue::from_f64(kind.as_u32() as f64)); + stmt = stmt.bind(&bindings)?; + + let result = stmt.all().await.map_err(|e| { + console_log!("Failed to fetch nwc events: {}", e); + format!("Failed to fetch nwc events: {}", e) + })?; + + let mut events: Vec = result + .results::()? + .iter() + .map(|row| { + let e: EventRow = serde_json::from_value(row.clone()).map_err(|e| { + console_log!("failed to parse event: {}", e); + worker::Error::from(format!( + "Failed to deserialize event from row ({}): {}", + row, e + )) + })?; + Ok(Event { + id: e.id, + pubkey: e.pubkey, + created_at: e.created_at, + kind: e.kind, + tags: e.tags.unwrap_or_default(), + content: e.content, + sig: e.sig, + }) + }) + .collect::>>()?; + + // Now get all the tags for all the events found + let event_ids: Vec = events.iter().map(|e| e.id).collect(); + let placeholders: String = event_ids.iter().map(|_| "?").collect::>().join(", "); + let tag_query_str = format!( + r#" + SELECT event_id, name, value FROM tag + WHERE event_id IN ({}) + ORDER BY id ASC + "#, + placeholders + ); + let mut tag_stmt = db.prepare(&tag_query_str); + let bindings: Vec = event_ids + .iter() + .map(|id| JsValue::from_str(&id.to_string())) + .collect(); + tag_stmt = tag_stmt.bind(&bindings)?; + + let tag_result = tag_stmt + .all() + .await + .map_err(|e| format!("Failed to fetch tags: {}", e))?; + + let tags: Vec = tag_result.results::()?; + let mut tags_map: HashMap> = HashMap::new(); + + for tag_row in tags { + if let Ok(tag) = Tag::parse(vec![tag_row.name, tag_row.value]) { + tags_map + .entry(tag_row.event_id) + .or_insert_with(Vec::new) + .push(tag); + } + } + + for event in &mut events { + if let Some(tags) = tags_map.remove(&event.id) { + event.tags.extend(tags); + } + } + + // Tag ordering could screw up signature, though it shouldn't matter + // for NWC messages because those should only have one tag. + // Also we insert tags in order and do an ORDER BY id so it should be fine. + events.retain(|event| match event.verify() { + Ok(_) => true, + Err(e) => { + console_log!("Verification failed for event with id {}: {}", event.id, e); + false + } + }); + + Ok(events) +} + +pub async fn handle_nwc_event(event: Event, db: &D1Database) -> Result> { + // Create the main event insertion query. + let event_insert_query = worker::query!( + db, + r#" + INSERT OR IGNORE INTO event (id, created_at, pubkey, kind, content, sig) + VALUES (?, ?, ?, ?, ?, ?) + "#, + &event.id, + &event.created_at, + &event.pubkey, + &event.kind, + &event.content, + &event.sig + )?; + + // Create a vector of tag insertion queries. + let mut tag_insert_queries: Vec<_> = event + .tags + .iter() + .map(|tag| { + worker::query!( + db, + r#" + INSERT OR IGNORE INTO tag (event_id, name, value) + VALUES (?, ?, ?) + "#, + &event.id, + &tag.kind().to_string(), + &tag.as_vec().get(1) + ) + .expect("should compile query") + }) + .collect(); + + // Combine the main event and tag insertion queries. + let mut batch_queries = vec![event_insert_query]; + batch_queries.append(&mut tag_insert_queries); + + // Run the batch queries. + let mut results = db.batch(batch_queries).await?.into_iter(); + + // Check the result of the main event insertion. + if let Some(error_msg) = results.next().and_then(|res| res.error()) { + console_log!("error saving nwc event to event table: {}", error_msg); + let relay_msg = RelayMessage::new_ok(event.id, false, &error_msg); + return Ok(Some(relay_msg)); + } + + // Check the results for the tag insertions. + for tag_insert_result in results { + if let Some(error_msg) = tag_insert_result.error() { + console_log!("error saving tag to tag table: {}", error_msg); + let relay_msg = RelayMessage::new_ok(event.id, false, &error_msg); + return Ok(Some(relay_msg)); + } + } + + let relay_msg = RelayMessage::new_ok(event.id, true, ""); + Ok(Some(relay_msg)) +} + +/// When a NWC request has been fulfilled, soft delete the request from the database +pub async fn delete_nwc_request(event: Event, db: &D1Database) -> Result<()> { + // Filter only relevant events + match event.kind { + Kind::WalletConnectResponse => (), + _ => return Ok(()), // skip other event types + }; + + let p_tag = event.tags.iter().find(|t| t.kind() == TagKind::P).cloned(); + let e_tag = event.tags.iter().find(|t| t.kind() == TagKind::E).cloned(); + + if let Some(Tag::PubKey(pubkey, ..)) = p_tag { + if let Some(Tag::Event(event_id, ..)) = e_tag { + // Soft delete the event based on pubkey and event_id + match worker::query!( + db, + "UPDATE event SET deleted = 1 WHERE pubkey = ? AND id = ?", + &pubkey.to_string(), + &event_id + )? + .run() + .await + { + Ok(_) => (), + Err(e) => { + console_log!("error soft deleting nwc event from database: {e}"); + return Ok(()); + } + } + } + } + + Ok(()) +} + +/// When a NWC response has been fulfilled, soft delete the response from the database +pub async fn delete_nwc_response(event: &Event, db: &D1Database) -> Result<()> { + // Filter only relevant events + match event.kind { + Kind::WalletConnectResponse => (), + _ => return Ok(()), // skip other event types + }; + + // Soft delete the event based on pubkey and id + match worker::query!( + db, + "UPDATE event SET deleted = 1 WHERE pubkey = ? AND id = ?", + &event.pubkey.to_string(), + &event.id + )? + .run() + .await + { + Ok(_) => console_log!("soft deleted nwc response event: {}", event.id), + Err(e) => { + console_log!("error soft deleting nwc event from database: {e}"); + return Ok(()); + } + } + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 5951e28..de0b2dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,13 @@ -use crate::nostr::get_nip11_response; -use crate::nostr::NOSTR_QUEUE_10; -use crate::nostr::NOSTR_QUEUE_7; use crate::nostr::NOSTR_QUEUE_8; use crate::nostr::NOSTR_QUEUE_9; pub(crate) use crate::nostr::{ try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5, NOSTR_QUEUE_6, }; -use ::nostr::{ - ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId, Tag, TagKind, -}; +use crate::{db::delete_nwc_request, nostr::NOSTR_QUEUE_10}; +use crate::{db::get_nwc_events, nostr::NOSTR_QUEUE_7}; +use crate::{db::handle_nwc_event, nostr::get_nip11_response}; +use ::nostr::{ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId}; use futures::StreamExt; use futures_util::lock::Mutex; use serde::{Deserialize, Serialize}; @@ -19,6 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use worker::*; +mod db; mod error; mod nostr; mod utils; @@ -113,12 +112,14 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { return relay_response(relay_msg); }; + let db = ctx.d1("DB")?; + // if the event is a nostr wallet connect event, we // should save it and not send to other relays. if let Some(relay_msg) = - handle_nwc_event(*event.clone(), &ctx).await? + handle_nwc_event(*event.clone(), &db).await? { - if let Err(e) = delete_nwc_request(*event, &ctx).await { + if let Err(e) = delete_nwc_request(*event, &db).await { console_log!("failed to delete nwc request: {e}"); } return relay_response(relay_msg); @@ -274,10 +275,12 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { continue; }; + let db = ctx.d1("DB").expect("should have DB"); + // if the event is a nostr wallet connect event, we // should save it and not send to other relays. if let Some(response) = - handle_nwc_event(*event.clone(), &ctx) + handle_nwc_event(*event.clone(), &db) .await .expect("failed to handle nwc event") { @@ -285,7 +288,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { .send_with_str(&response.as_json()) .expect("failed to send response"); - if let Err(e) = delete_nwc_request(*event, &ctx).await { + if let Err(e) = delete_nwc_request(*event, &db).await { console_log!("failed to delete nwc request: {e}"); } @@ -382,7 +385,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { // set running thread to true running_thread.swap(true, Ordering::Relaxed); - let ctx_clone = ctx.clone(); + let db = ctx.d1("DB").expect("should have DB"); let sub_id = subscription_id.clone(); let server_clone = server.clone(); let master_clone = requested_filters.clone(); @@ -397,7 +400,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { sub_id.clone(), master.clone(), &server_clone, - &ctx_clone, + &db, ).await { Ok(new_event_ids) => { @@ -500,7 +503,7 @@ pub async fn handle_filter( subscription_id: SubscriptionId, filter: Filter, server: &WebSocket, - ctx: &RouteContext<()>, + db: &D1Database, ) -> Result> { let mut events = vec![]; // get all authors and pubkeys @@ -519,7 +522,7 @@ pub async fn handle_filter( .unwrap_or_default() .contains(&Kind::WalletConnectRequest) { - let mut found_events = get_nwc_events(&keys, Kind::WalletConnectRequest, ctx) + let mut found_events = get_nwc_events(&keys, Kind::WalletConnectRequest, db) .await .unwrap_or_default(); @@ -534,7 +537,7 @@ pub async fn handle_filter( .unwrap_or_default() .contains(&Kind::WalletConnectResponse) { - let mut found_events = get_nwc_events(&keys, Kind::WalletConnectResponse, ctx) + let mut found_events = get_nwc_events(&keys, Kind::WalletConnectResponse, db) .await .unwrap_or_default(); @@ -567,138 +570,6 @@ pub async fn handle_filter( Ok(sent_event_ids) } -pub async fn get_nwc_events( - keys: &[String], - kind: Kind, - ctx: &RouteContext<()>, -) -> Result> { - let kv_store = match kind { - Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?, - Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?, - _ => return Ok(vec![]), // skip other event types, todo, we may want to store info events as well - }; - - let mut events = vec![]; - for key in keys { - let nwc_events = kv_store - .get(key) - .json::>() - .await? - .unwrap_or_default(); - for nwc_event in nwc_events { - // delete responses since we don't care after sending - if kind == Kind::WalletConnectResponse { - if let Err(e) = delete_nwc_response(&nwc_event, ctx).await { - console_log!("failed to delete nwc response: {e}"); - } - } - events.push(nwc_event); - } - } - - Ok(events) -} - -pub async fn handle_nwc_event( - event: Event, - ctx: &RouteContext<()>, -) -> Result> { - let kv_store = match event.kind { - Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?, - Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?, - _ => return Ok(None), // skip other event types, todo, we may want to store info events as well - }; - - console_log!("got a wallet connect event: {}", event.id); - - let key = &event.pubkey.to_string(); - - let new_nwc_responses = match kv_store.get(key).json::>().await { - Ok(Some(mut current)) => { - current.push(event.clone()); - current - } - Ok(None) => vec![event.clone()], - Err(e) => { - console_log!("error getting nwc events from KV: {e}"); - let relay_msg = - RelayMessage::new_ok(event.id, false, "error: could not save published note"); - return Ok(Some(relay_msg)); - } - }; - - // save new vector of events - if let Err(e) = kv_store.put(key, new_nwc_responses)?.execute().await { - console_log!("error saving nwc: {e}"); - let relay_msg = - RelayMessage::new_ok(event.id, false, "error: could not save published note"); - return Ok(Some(relay_msg)); - } - console_log!("saved nwc event: {}", event.id); - - let relay_msg = RelayMessage::new_ok(event.id, true, ""); - Ok(Some(relay_msg)) -} - -/// When a NWC request has been fulfilled, delete the request from KV -pub async fn delete_nwc_request(event: Event, ctx: &RouteContext<()>) -> Result<()> { - let kv_store = match event.kind { - Kind::WalletConnectResponse => ctx.kv("NWC_REQUESTS")?, - _ => return Ok(()), // skip other event types - }; - - let p_tag = event.tags.iter().find(|t| t.kind() == TagKind::P).cloned(); - let e_tag = event.tags.iter().find(|t| t.kind() == TagKind::E).cloned(); - - if let Some(Tag::PubKey(pubkey, ..)) = p_tag { - if let Some(Tag::Event(event_id, ..)) = e_tag { - let key = &pubkey.to_string(); - match kv_store.get(key).json::>().await { - Ok(Some(current)) => { - let new_events: Vec = - current.into_iter().filter(|e| e.id != event_id).collect(); - - // save new vector of events - kv_store.put(key, new_events)?.execute().await?; - console_log!("deleted nwc request event: {}", event_id); - } - Ok(None) => return Ok(()), - Err(e) => { - console_log!("error getting nwc events from KV: {e}"); - return Ok(()); - } - }; - }; - }; - - Ok(()) -} - -pub async fn delete_nwc_response(event: &Event, ctx: &RouteContext<()>) -> Result<()> { - let kv_store = match event.kind { - Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?, - _ => return Ok(()), // skip other event types - }; - - let key = &event.pubkey.to_string(); - match kv_store.get(key).json::>().await { - Ok(Some(current)) => { - let new_events: Vec = current.into_iter().filter(|e| e.id != event.id).collect(); - - // save new vector of events - kv_store.put(key, new_events)?.execute().await?; - console_log!("deleted nwc response event: {}", event.id); - } - Ok(None) => return Ok(()), - Err(e) => { - console_log!("error getting nwc events from KV: {e}"); - return Ok(()); - } - }; - - Ok(()) -} - // Helper function to extend a vector without duplicates fn extend_without_duplicates(master: &mut Vec, new: &Vec) { for item in new { diff --git a/wrangler.toml b/wrangler.toml index c7f2a3e..e060212 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -29,6 +29,16 @@ kv_namespaces = [ { binding = "NWC_RESPONSES", id = "5b434d5eced84abaad1c9a44448ac71c", preview_id = "af27b55b58754562b4250dcd3682547b" }, ] +[[d1_databases]] +binding = "DB" +database_name = "blastr-db" +database_id = "82736a30-841d-4c24-87d8-788763dacb01" + +[[env.staging.d1_databases]] +binding = "DB" +database_name = "blastr-db-staging" +database_id = "ba4c227b-edf2-46a4-99fa-4b7bfa036800" + [env.staging.vars] WORKERS_RS_VERSION = "0.0.18" ENVIRONMENT = "staging"