Skip to content

Commit

Permalink
Implement nostr event db
Browse files Browse the repository at this point in the history
  • Loading branch information
TonyGiorgio committed Aug 11, 2023
1 parent faa2cc2 commit 167b9fc
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 1 deletion.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ default = ["console_error_panic_hook"]

[dependencies]
cfg-if = "0.1.2"
worker = { version = "0.0.18", features = ["queue"] }
worker = { version = "0.0.18", features = ["queue", "d1"] }
futures = "0.3.26"
futures-util = { version = "0.3", default-features = false }
nostr = { version = "0.22.0", default-features = false, features = ["nip11"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = "1.0.67"

# The `console_error_panic_hook` crate provides better debugging of panics by
# logging them with `console.error`. This is great for development, but requires
Expand Down
37 changes: 37 additions & 0 deletions migrations/0000_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- Migration number: 0000 2023-08-10T16:43:44.275Z
-- Event Table
CREATE TABLE IF NOT EXISTS event (
event_id BLOB PRIMARY KEY, -- A unique 4-byte hash representing the event.
created_at INTEGER NOT NULL, -- When the event was authored.
pubkey BLOB NOT NULL, -- Author's public key.
kind INTEGER NOT NULL, -- Event kind/type.
content TEXT NOT NULL -- Serialized json of event object.
sig BLOB NOT NULL, -- Signature
deleted INTEGER NOT NULL DEFAULT 0 -- Soft delete flag, 0 = not deleted, 1 = deleted
);

-- Tag Table
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY, -- Auto-incrementing ID for tags.
event_id BLOB NOT NULL, -- Link to the event that the tag is associated with.
name TEXT NOT NULL, -- The tag's name.
value TEXT, -- The tag's value.
FOREIGN KEY(event_id) REFERENCES event(event_id) ON DELETE CASCADE
);

-- Indexes for faster queries

-- Index to speed up look-ups by the pubkey of an event.
CREATE INDEX IF NOT EXISTS pubkey_index ON event(pubkey);

-- Index to speed up look-ups by the kind/type of an event.
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);

-- Index to link tags to events.
CREATE INDEX IF NOT EXISTS tag_event_index ON tag(event_id);

-- Index to speed up look-ups by tag names.
CREATE INDEX IF NOT EXISTS tag_name_index ON tag(name);

-- Index to speed up look-ups by tag values.
CREATE INDEX IF NOT EXISTS tag_value_index ON tag(value);
215 changes: 215 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use std::collections::HashMap;

use ::nostr::{Event, Kind, RelayMessage};
use nostr::{EventId, Tag, TagKind};
use serde_json::Value;
use worker::D1Database;
use worker::*;

pub async fn get_nwc_events(keys: &[String], kind: Kind, db: &D1Database) -> Result<Vec<Event>> {
// Determine the event kind
match kind {
Kind::WalletConnectResponse => (),
Kind::WalletConnectRequest => (),
_ => return Ok(vec![]), // skip other event types
};

// Convert keys to a format SQLite can understand for the IN clause
let placeholders: Vec<String> = keys.iter().map(|_| "?".to_string()).collect();
let joined_placeholders = placeholders.join(", ");

let stmt = worker::query!(
db,
&format!(
r#"
SELECT * FROM event
WHERE pubkey IN ({}) AND kind = ? AND deleted = 0
"#,
joined_placeholders
),
keys,
kind
)?;

let result = stmt
.all()
.await
.map_err(|e| format!("Failed to fetch nwc events: {}", e))?;

let mut events: Vec<Event> = result
.results::<Value>()?
.iter()
.map(|row| {
serde_json::from_value(row.clone()).map_err(|e| {
worker::Error::from(format!(
"Failed to deserialize event from row ({}): {}",
row, e
))
})
})
.collect::<Result<Vec<Event>>>()?;

// Fetch all tags for the fetched events
let event_ids: Vec<EventId> = events.iter().map(|e| e.id.clone()).collect();
let tag_stmt = worker::query!(
db,
&format!(
r#"
SELECT event_id, name, value FROM tag
WHERE event_id IN ({})
"#,
event_ids
.iter()
.map(|_| "?".to_string())
.collect::<Vec<_>>()
.join(", ")
),
&event_ids
)?;

let tag_result = tag_stmt
.all()
.await
.map_err(|e| format!("Failed to fetch tags: {}", e))?;

let mut tags_map: HashMap<EventId, Vec<Tag>> = HashMap::new();
for row in tag_result.results::<(EventId, String, String)>()? {
let (event_id, name, value) = row;
if let Ok(tag) = Tag::parse(vec![name, value]) {
tags_map.entry(event_id).or_insert_with(Vec::new).push(tag);
}
}

for event in &mut events {
if let Some(tags) = tags_map.remove(&event.id) {
event.tags.extend(tags);
}
}

Ok(events)
}

pub async fn handle_nwc_event(event: Event, db: &D1Database) -> Result<Option<RelayMessage>> {
// Create the main event insertion query.
let event_insert_query = worker::query!(
db,
r#"
INSERT INTO event (event_id, created_at, pubkey, kind, content, sig)
VALUES (?, ?, ?, ?, ?, ?)
"#,
&event.id,
&event.created_at,
&event.pubkey,
&event.kind,
&event.content,
&event.sig
)?;

// Create a vector of tag insertion queries.
let mut tag_insert_queries: Vec<_> = event
.tags
.iter()
.map(|tag| {
worker::query!(
db,
r#"
INSERT INTO tag (event_id, name, value)
VALUES (?, ?, ?)
"#,
&event.id,
&tag.kind().to_string(),
&tag.as_vec().get(1)
)
.expect("should compile query")
})
.collect();

// Combine the main event and tag insertion queries.
let mut batch_queries = vec![event_insert_query];
batch_queries.append(&mut tag_insert_queries);

// Run the batch queries.
let mut results = db.batch(batch_queries).await?.into_iter();

// Check the result of the main event insertion.
if let Some(error_msg) = results.next().and_then(|res| res.error()) {
console_log!("error saving nwc event to event table: {}", error_msg);
let relay_msg = RelayMessage::new_ok(event.id, false, &error_msg);
return Ok(Some(relay_msg));
}

// Check the results for the tag insertions.
for tag_insert_result in results {
if let Some(error_msg) = tag_insert_result.error() {
console_log!("error saving tag to tag table: {}", error_msg);
let relay_msg = RelayMessage::new_ok(event.id, false, &error_msg);
return Ok(Some(relay_msg));
}
}

let relay_msg = RelayMessage::new_ok(event.id, true, "");
Ok(Some(relay_msg))
}

/// When a NWC request has been fulfilled, soft delete the request from the database
pub async fn delete_nwc_request(event: Event, db: &D1Database) -> Result<()> {
// Filter only relevant events
match event.kind {
Kind::WalletConnectResponse => (),
_ => return Ok(()), // skip other event types
};

let p_tag = event.tags.iter().find(|t| t.kind() == TagKind::P).cloned();
let e_tag = event.tags.iter().find(|t| t.kind() == TagKind::E).cloned();

if let Some(Tag::PubKey(pubkey, ..)) = p_tag {
if let Some(Tag::Event(event_id, ..)) = e_tag {
// Soft delete the event based on pubkey and event_id
match worker::query!(
db,
"UPDATE event SET deleted = 1 WHERE pubkey = ? AND event_id = ?",
&pubkey.to_string(),
&event_id
)?
.run()
.await
{
Ok(_) => (),
Err(e) => {
console_log!("error soft deleting nwc event from database: {e}");
return Ok(());
}
}
}
}

Ok(())
}

/// When a NWC response has been fulfilled, soft delete the response from the database
pub async fn delete_nwc_response(event: &Event, db: &D1Database) -> Result<()> {
// Filter only relevant events
match event.kind {
Kind::WalletConnectResponse => (),
_ => return Ok(()), // skip other event types
};

// Soft delete the event based on pubkey and event_id
match worker::query!(
db,
"UPDATE event SET deleted = 1 WHERE pubkey = ? AND event_id = ?",
&event.pubkey.to_string(),
&event.id
)?
.run()
.await
{
Ok(_) => console_log!("soft deleted nwc response event: {}", event.id),
Err(e) => {
console_log!("error soft deleting nwc event from database: {e}");
return Ok(());
}
}

Ok(())
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use worker::*;

mod db;
mod error;
mod nostr;
mod utils;
Expand Down
10 changes: 10 additions & 0 deletions wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ routes = [
{ pattern = "nostr.mutinywallet.com/event", zone_id = "2b9268714ce8d1c4431e8046d4ba55d3" }
]

[[d1_databases]]
binding = "DB"
database_name = "blastr-db"
database_id = "82736a30-841d-4c24-87d8-788763dacb01"

# replace with your KV store info
# create the queues with `wrangler kv:namespace create PUBLISHED_NOTES` and the same command with the `--preview` flag.
# put your queue IDs below
Expand All @@ -29,6 +34,11 @@ kv_namespaces = [
{ binding = "NWC_RESPONSES", id = "5b434d5eced84abaad1c9a44448ac71c", preview_id = "af27b55b58754562b4250dcd3682547b" },
]

[[env.staging.d1_databases]]
binding = "DB"
database_name = "blastr-db-staging"
database_id = "ba4c227b-edf2-46a4-99fa-4b7bfa036800"

[env.staging.vars]
WORKERS_RS_VERSION = "0.0.18"
ENVIRONMENT = "staging"
Expand Down

0 comments on commit 167b9fc

Please sign in to comment.