From fa7c375595c5dde92935c53677679eeb96a30e02 Mon Sep 17 00:00:00 2001 From: Maxim Shishmarev Date: Wed, 9 Jun 2021 13:52:04 +1000 Subject: [PATCH 1/9] Make file deletion async; use in-memory database in tests --- src/storage.rs | 104 ++++++++++++++++++++++++++++++------------------- src/tests.rs | 53 ++++++++++++------------- 2 files changed, 88 insertions(+), 69 deletions(-) diff --git a/src/storage.rs b/src/storage.rs index 9cfe653..00d9608 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::fs; use std::path::Path; use std::sync::Mutex; @@ -84,7 +83,7 @@ pub fn create_database_if_needed(room_id: &str) { create_room_tables_if_needed(&conn); } -fn create_room_tables_if_needed(conn: &DatabaseConnection) { +pub fn create_room_tables_if_needed(conn: &DatabaseConnection) { // Messages // The `id` field is needed to make `rowid` stable, which is important because otherwise // the `id`s in this table won't correspond to those in the deleted messages table @@ -248,48 +247,39 @@ async fn prune_pending_tokens() { info!("Pruned pending tokens."); } -pub async fn prune_files(file_expiration: i64) { - // The expiration setting is passed in for testing purposes - let rooms = match get_all_room_ids() { - Ok(rooms) => rooms, - Err(_) => return, - }; - for room in rooms { - // It's not catastrophic if we fail to prune the database for a given room - let pool = pool_by_room_id(&room); - let now = chrono::Utc::now().timestamp(); - let expiration = now - file_expiration; - // Get a database connection and open a transaction - let conn = match pool.get() { - Ok(conn) => conn, - Err(e) => { - return error!( - "Couldn't get database connection to prune files due to error: {}.", - e - ) - } - }; - // Get the IDs of the files to delete - let raw_query = format!("SELECT id FROM {} WHERE timestamp < (?1)", FILES_TABLE); - let mut query = match conn.prepare(&raw_query) { - Ok(query) => query, - Err(e) => return error!("Couldn't prepare query to prune files due to error: {}.", e), - }; - let rows = match query.query_map(params![expiration], |row| row.get(0)) { - Ok(rows) => rows, - Err(e) => { - return error!( - "Couldn't prune files due to error: {} (expiration = {}).", - e, expiration - ); - } - }; - let ids: Vec = rows.filter_map(|result| result.ok()).collect(); - if !ids.is_empty() { +fn get_expired_file_ids( + pool: &DatabaseConnectionPool, file_expiration: i64, +) -> Result, ()> { + let now = chrono::Utc::now().timestamp(); + let expiration = now - file_expiration; + // Get a database connection and open a transaction + let conn = pool.get().map_err(|e| { + error!("Couldn't get database connection to prune files due to error: {}.", e); + })?; + // Get the IDs of the files to delete + let raw_query = format!("SELECT id FROM {} WHERE timestamp < (?1)", FILES_TABLE); + + let mut query = conn.prepare(&raw_query).map_err(|e| { + error!("Couldn't prepare query to prune files due to error: {}.", e); + })?; + + let rows = query.query_map(params![expiration], |row| row.get(0)).map_err(|e| { + error!("Couldn't prune files due to error: {} (expiration = {}).", e, expiration); + })?; + + Ok(rows.filter_map(|result| result.ok()).collect()) +} + +pub async fn prune_files_for_room(pool: &DatabaseConnectionPool, room: &str, file_expiration: i64) { + let ids = get_expired_file_ids(&pool, file_expiration); + + match ids { + Ok(ids) if !ids.is_empty() => { // Delete the files let mut deleted_ids: Vec = vec![]; + for id in ids { - match fs::remove_file(format!("files/{}_files/{}", room, id)) { + match tokio::fs::remove_file(format!("files/{}_files/{}", room, id)).await { Ok(_) => deleted_ids.push(id), Err(e) => { error!( @@ -300,6 +290,17 @@ pub async fn prune_files(file_expiration: i64) { } } } + + let conn = match pool.get() { + Ok(conn) => conn, + Err(e) => { + return error!( + "Couldn't get database connection to prune files due to error: {}.", + e + ) + } + }; + // Remove the file records from the database // FIXME: It'd be great to do this in a single statement, but apparently this is not supported very well for id in deleted_ids { @@ -314,9 +315,30 @@ pub async fn prune_files(file_expiration: i64) { // Log the result info!("Pruned files for room: {}.", room); } + Ok(_) => { + // empty + } + Err(_) => { + // It's not catastrophic if we fail to prune the database for a given room + } } } +pub async fn prune_files(file_expiration: i64) { + // The expiration setting is passed in for testing purposes + let rooms = match get_all_room_ids() { + Ok(rooms) => rooms, + Err(_) => return, + }; + + let futs = rooms.into_iter().map(|room| async move { + let pool = pool_by_room_id(&room); + prune_files_for_room(&pool, &room, file_expiration).await; + }); + + futures::future::join_all(futs).await; +} + // Migration pub fn perform_migration() { diff --git a/src/tests.rs b/src/tests.rs index 8bf2532..09ecd4c 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,37 +1,34 @@ use std::collections::HashMap; use std::fs; -use std::path::Path; use rand::{thread_rng, Rng}; use rusqlite::params; +use rusqlite::OpenFlags; use warp::http::StatusCode; +use crate::storage::DatabaseConnectionPool; + use super::crypto; use super::handlers; -use super::models; use super::storage; -fn perform_main_setup() { - storage::create_main_database_if_needed(); - fs::create_dir_all("rooms").unwrap(); - fs::create_dir_all("files").unwrap(); -} +async fn set_up_test_room() -> DatabaseConnectionPool { + let manager = r2d2_sqlite::SqliteConnectionManager::file("file::memory:?cache=shared"); + let mut flags = OpenFlags::default(); + flags.set(OpenFlags::SQLITE_OPEN_URI, true); -async fn set_up_test_room() { - perform_main_setup(); - let test_room_id = "test_room"; - let test_room_name = "Test Room"; - let test_room = models::Room { id: test_room_id.to_string(), name: test_room_name.to_string() }; - handlers::create_room(test_room).await.unwrap(); - let raw_path = format!("rooms/{}.db", test_room_id); - let path = Path::new(&raw_path); - fs::read(path).unwrap(); // Fail if this doesn't exist + let manager = manager.with_flags(flags); + + let pool = r2d2::Pool::::new(manager).unwrap(); + + let conn = pool.get().unwrap(); + + storage::create_room_tables_if_needed(&conn); + + pool } -fn get_auth_token() -> (String, String) { - // Get a database connection pool - let test_room_id = "test_room"; - let pool = storage::pool_by_room_id(&test_room_id); +fn get_auth_token(pool: &DatabaseConnectionPool) -> (String, String) { // Generate a fake user key pair let (user_private_key, user_public_key) = crypto::generate_x25519_key_pair(); let hex_user_public_key = format!("05{}", hex::encode(user_public_key.to_bytes())); @@ -57,12 +54,12 @@ fn get_auth_token() -> (String, String) { #[tokio::test] async fn test_authorization() { // Ensure the test room is set up and get a database connection pool - set_up_test_room().await; - let test_room_id = "test_room"; - let pool = storage::pool_by_room_id(&test_room_id); + + let pool = set_up_test_room().await; + // Get an auth token // This tests claiming a token internally - let (_, hex_user_public_key) = get_auth_token(); + let (_, hex_user_public_key) = get_auth_token(&pool); // Try to claim an incorrect token let mut incorrect_token = [0u8; 48]; thread_rng().fill(&mut incorrect_token[..]); @@ -76,11 +73,11 @@ async fn test_authorization() { #[tokio::test] async fn test_file_handling() { // Ensure the test room is set up and get a database connection pool - set_up_test_room().await; + let pool = set_up_test_room().await; + let test_room_id = "test_room"; - let pool = storage::pool_by_room_id(&test_room_id); // Get an auth token - let (auth_token, _) = get_auth_token(); + let (auth_token, _) = get_auth_token(&pool); // Store the test file handlers::store_file( Some(test_room_id.to_string()), @@ -105,7 +102,7 @@ async fn test_file_handling() { assert_eq!(base64_encoded_file, TEST_FILE); // Prune the file and check that it's gone // Will evaluate to now + 60 - storage::prune_files(-60).await; + storage::prune_files_for_room(&pool, test_room_id, -60).await; // It should be gone now fs::read(format!("files/{}_files/{}", test_room_id, id)).unwrap_err(); // Check that the file record is also gone From e6b9b8d42f20a59dd7dcfb1589660084f51931e1 Mon Sep 17 00:00:00 2001 From: Maxim Shishmarev Date: Thu, 10 Jun 2021 11:42:45 +1000 Subject: [PATCH 2/9] Delete all expired files in parallel --- src/storage.rs | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/src/storage.rs b/src/storage.rs index 00d9608..45b46b5 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -276,18 +276,21 @@ pub async fn prune_files_for_room(pool: &DatabaseConnectionPool, room: &str, fil match ids { Ok(ids) if !ids.is_empty() => { // Delete the files - let mut deleted_ids: Vec = vec![]; - - for id in ids { - match tokio::fs::remove_file(format!("files/{}_files/{}", room, id)).await { - Ok(_) => deleted_ids.push(id), - Err(e) => { - error!( - "Couldn't delete file: {} from room: {} due to error: {}.", - id, room, e - ); - deleted_ids.push(id); - } + let futs = ids.iter().map(|id| async move { + ( + tokio::fs::remove_file(format!("files/{}_files/{}", room, id)).await, + id.to_owned(), + ) + }); + + let results = futures::future::join_all(futs).await; + + for (res, id) in results { + if let Err(err) = res { + error!( + "Couldn't delete file: {} from room: {} due to error: {}.", + id, room, err + ); } } @@ -301,9 +304,13 @@ pub async fn prune_files_for_room(pool: &DatabaseConnectionPool, room: &str, fil } }; + // Measure the time it takes to delete all files sequentially + // (this might become a problem since we're not using an async interface) + let now = std::time::Instant::now(); + // Remove the file records from the database // FIXME: It'd be great to do this in a single statement, but apparently this is not supported very well - for id in deleted_ids { + for id in ids { let stmt = format!("DELETE FROM {} WHERE id = (?1)", FILES_TABLE); match conn.execute(&stmt, params![id]) { Ok(_) => (), @@ -313,7 +320,7 @@ pub async fn prune_files_for_room(pool: &DatabaseConnectionPool, room: &str, fil }; } // Log the result - info!("Pruned files for room: {}.", room); + info!("Pruned files for room: {}. Took: {:?}", room, now.elapsed()); } Ok(_) => { // empty From 09baab90b819530fadc91603299d283112994621 Mon Sep 17 00:00:00 2001 From: Sean Date: Thu, 9 Sep 2021 11:12:36 +1000 Subject: [PATCH 3/9] Update BUILDING.md --- BUILDING.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/BUILDING.md b/BUILDING.md index 7efeaf5..1684703 100644 --- a/BUILDING.md +++ b/BUILDING.md @@ -18,18 +18,22 @@ Make sure you're pointing to the right openssl installation (e.g. macOS provides ### Step 2: Build the project +The Linux Rust installer assumes that you already have a C linker installed. If this is not the case you'll see `error: linker 'cc' not found`. To fix this, run: + ``` -cargo build --release +sudo apt update +sudo apt upgrade +sudo apt install build-essential libssl-dev pkg-config ``` -The Linux Rust installer assumes that you already have a C linker installed. If this is not the case you'll see `error: linker 'cc' not found`. To fix this, run: +Build it with ``` -apt update -sudo apt install build-essential +cargo build --release ``` ### Step 3: Run it +The two files generated in step 1 should be copied to the same directory as the executable. Alternatively you can use the command line arguments below to specify their locations. The executable needs both the x25519-public-key and the x25519-private-key to run. ``` ./target/release/session-open-group-server From 66e5a61742f4ac49c74900b079d9a278fa4305a8 Mon Sep 17 00:00:00 2001 From: Sean Date: Thu, 9 Sep 2021 11:34:46 +1000 Subject: [PATCH 4/9] Update BUILDING.md Co-authored-by: Jason Rhinelander --- BUILDING.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/BUILDING.md b/BUILDING.md index 1684703..2dcbb0e 100644 --- a/BUILDING.md +++ b/BUILDING.md @@ -26,6 +26,8 @@ sudo apt upgrade sudo apt install build-essential libssl-dev pkg-config ``` +(Or the equivalent on a non-Debian-based Linux system). + Build it with ``` From f242a8273864d49c740cfd802ec3517a0349ab5f Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 7 Sep 2021 14:19:05 -0300 Subject: [PATCH 5/9] Remove table-names-in-global-variables antipattern These make the code more complex and make the the queries harder to read. Tables in global variables is an antipattern that stemmed from old PHP (or similar) web hosts that came with a single MySQL table requiring you use different prefixes to load multiple instances in the same table. Without mysql or prefix configurability it's an antipattern that just results in less readable code. --- src/handlers.rs | 126 ++++++++++++++++++------------------------------ src/storage.rs | 101 ++++++++++++++------------------------ src/tests.rs | 4 +- 3 files changed, 84 insertions(+), 147 deletions(-) diff --git a/src/handlers.rs b/src/handlers.rs index d30bddb..561bd1e 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -43,7 +43,7 @@ pub async fn create_room(room: models::Room) -> Result { let pool = &storage::MAIN_POOL; let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the room - let stmt = format!("REPLACE INTO {} (id, name) VALUES (?1, ?2)", storage::MAIN_TABLE); + let stmt = "REPLACE INTO main (id, name) VALUES (?1, ?2)"; match conn.execute(&stmt, params![&room.id, &room.name]) { Ok(_) => (), Err(e) => { @@ -65,7 +65,7 @@ pub async fn delete_room(id: String) -> Result { let pool = &storage::MAIN_POOL; let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the room - let stmt = format!("DELETE FROM {} WHERE id = (?1)", storage::MAIN_TABLE); + let stmt = "DELETE FROM main WHERE id = (?1)"; match conn.execute(&stmt, params![&id]) { Ok(_) => (), Err(e) => { @@ -85,7 +85,7 @@ pub fn get_room(room_id: &str) -> Result { let pool = &storage::MAIN_POOL; let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Get the room info if possible - let raw_query = format!("SELECT id, name FROM {} where id = (?1)", storage::MAIN_TABLE); + let raw_query = "SELECT id, name FROM main where id = (?1)"; let room = match conn.query_row(&raw_query, params![room_id], |row| { Ok(models::Room { id: row.get(0)?, name: row.get(1)? }) }) { @@ -107,7 +107,7 @@ pub fn get_all_rooms() -> Result { let pool = &storage::MAIN_POOL; let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Get the room info if possible - let raw_query = format!("SELECT id, name FROM {}", storage::MAIN_TABLE); + let raw_query = "SELECT id, name FROM main"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query .query_map(params![], |row| Ok(models::Room { id: row.get(0)?, name: row.get(1)? })) @@ -165,7 +165,7 @@ pub async fn store_file( let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // INSERT rather than REPLACE so that on the off chance there's already a file with this exact // id (i.e. timestamp) we simply error out and get the client to retry. - let stmt = format!("INSERT INTO {} (id, timestamp) VALUES (?1, ?2)", storage::FILES_TABLE); + let stmt = "INSERT INTO files (id, timestamp) VALUES (?1, ?2)"; let _ = match conn.execute(&stmt, params![id.to_string(), now]) { Ok(rows) => rows, Err(e) => { @@ -361,10 +361,8 @@ pub fn get_auth_token_challenge( // Note that a given public key can have multiple pending tokens let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; let now = chrono::Utc::now().timestamp(); - let stmt = format!( - "INSERT INTO {} (public_key, timestamp, token) VALUES (?1, ?2, ?3)", - storage::PENDING_TOKENS_TABLE - ); + let stmt = + "INSERT INTO pending_tokens (public_key, timestamp, token) VALUES (?1, ?2, ?3)"; let _ = match conn.execute(&stmt, params![hex_public_key, now, token]) { Ok(rows) => rows, Err(e) => { @@ -406,10 +404,8 @@ pub fn claim_auth_token( .ok_or(Error::Unauthorized)?; let token = &pending_tokens[index].1; // Store the claimed token - let stmt = format!( - "INSERT INTO {} (public_key, timestamp, token) VALUES (?1, ?2, ?3)", - storage::TOKENS_TABLE - ); + let stmt = + "INSERT INTO tokens (public_key, timestamp, token) VALUES (?1, ?2, ?3)"; let now = chrono::Utc::now().timestamp(); match conn.execute(&stmt, params![public_key, now, hex::encode(token)]) { Ok(_) => (), @@ -419,7 +415,7 @@ pub fn claim_auth_token( } } // Delete all pending tokens for the given public key - let stmt = format!("DELETE FROM {} WHERE public_key = (?1)", storage::PENDING_TOKENS_TABLE); + let stmt = "DELETE FROM pending_tokens WHERE public_key = (?1)"; match conn.execute(&stmt, params![public_key]) { Ok(_) => (), Err(e) => error!("Couldn't delete pending tokens due to error: {}.", e), // It's not catastrophic if this fails @@ -441,7 +437,7 @@ pub fn delete_auth_token( // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Delete the token - let stmt = format!("DELETE FROM {} WHERE public_key = (?1)", storage::TOKENS_TABLE); + let stmt = "DELETE FROM tokens WHERE public_key = (?1)"; match conn.execute(&stmt, params![requesting_public_key]) { Ok(_) => (), Err(e) => { @@ -493,10 +489,8 @@ pub fn insert_message( } // Insert the message message.timestamp = timestamp; - let stmt = format!( - "INSERT INTO {} (public_key, timestamp, data, signature, is_deleted) VALUES (?1, ?2, ?3, ?4, ?5)", - storage::MESSAGES_TABLE - ); + let stmt = + "INSERT INTO messages (public_key, timestamp, data, signature, is_deleted) VALUES (?1, ?2, ?3, ?4, ?5)"; match tx.execute( &stmt, params![&requesting_public_key, message.timestamp, message.data, message.signature, 0], @@ -526,10 +520,8 @@ fn get_last_5_messages( public_key: &str, pool: &storage::DatabaseConnectionPool, ) -> Result, Rejection> { let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; - let raw_query = format!( - "SELECT id, public_key, timestamp, data, signature FROM {} WHERE public_key = (?1) ORDER BY timestamp DESC LIMIT 5", - storage::MESSAGES_TABLE - ); + let raw_query = + "SELECT id, public_key, timestamp, data, signature FROM messages WHERE public_key = (?1) ORDER BY timestamp DESC LIMIT 5"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![public_key], |row| { Ok(models::Message { @@ -575,14 +567,11 @@ pub fn get_messages( limit = 256; } // Query the database - let raw_query: String; + let raw_query: &str; if query_params.get("from_server_id").is_some() { - raw_query = format!("SELECT id, public_key, timestamp, data, signature FROM {} WHERE id > (?1) AND is_deleted = 0 ORDER BY id ASC LIMIT (?2)", storage::MESSAGES_TABLE); + raw_query = "SELECT id, public_key, timestamp, data, signature FROM messages WHERE id > (?1) AND is_deleted = 0 ORDER BY id ASC LIMIT (?2)"; } else { - raw_query = format!( - "SELECT id, public_key, timestamp, data, signature FROM {} WHERE is_deleted = 0 ORDER BY id DESC LIMIT (?2)", - storage::MESSAGES_TABLE - ); + raw_query = "SELECT id, public_key, timestamp, data, signature FROM messages WHERE is_deleted = 0 ORDER BY id DESC LIMIT (?2)"; } let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![from_server_id, limit], |row| { @@ -617,10 +606,8 @@ fn update_usage_statistics( let public_key = get_public_key_for_auth_token(auth_token, pool)?; let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; let now = chrono::Utc::now().timestamp(); - let stmt = format!( - "INSERT OR REPLACE INTO {} (public_key, last_active) VALUES(?1, ?2)", - storage::USER_ACTIVITY_TABLE - ); + let stmt = + "INSERT OR REPLACE INTO user_activity (public_key, last_active) VALUES(?1, ?2)"; conn.execute(&stmt, params![public_key, now]).map_err(|_| Error::DatabaseFailedInternally)?; return Ok(()); } @@ -658,8 +645,7 @@ pub fn delete_message( // Check that the requesting user is either the sender of the message or a moderator let sender_option: Option = { let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; - let raw_query = - format!("SELECT public_key FROM {} WHERE id = (?1)", storage::MESSAGES_TABLE); + let raw_query = "SELECT public_key FROM messages WHERE id = (?1)"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![id], |row| row.get(0)) { Ok(rows) => rows, @@ -680,7 +666,7 @@ pub fn delete_message( let mut conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; let tx = conn.transaction().map_err(|_| Error::DatabaseFailedInternally)?; // Delete the message if it's present - let stmt = format!("UPDATE {} SET public_key = 'deleted', timestamp = 0, data = 'deleted', signature = 'deleted', is_deleted = 1 WHERE id = (?1)", storage::MESSAGES_TABLE); + let stmt = "UPDATE messages SET public_key = 'deleted', timestamp = 0, data = 'deleted', signature = 'deleted', is_deleted = 1 WHERE id = (?1)"; let count = match tx.execute(&stmt, params![id]) { Ok(count) => count, Err(e) => { @@ -690,10 +676,8 @@ pub fn delete_message( }; // Update the deletions table if needed if count > 0 { - let stmt = format!( - "INSERT INTO {} (deleted_message_id) VALUES (?1)", - storage::DELETED_MESSAGES_TABLE - ); + let stmt = + "INSERT INTO deleted_messages (deleted_message_id) VALUES (?1)"; match tx.execute(&stmt, params![id]) { Ok(_) => (), Err(e) => { @@ -735,19 +719,13 @@ pub fn get_deleted_messages( limit = 256; } // Query the database - let raw_query: String; + let raw_query: &str; if query_params.get("from_server_id").is_some() { - raw_query = format!( - "SELECT id, deleted_message_id FROM {} WHERE id > (?1) ORDER BY id ASC LIMIT (?2)", - storage::DELETED_MESSAGES_TABLE - ); + raw_query = "SELECT id, deleted_message_id FROM deleted_messages WHERE id > (?1) ORDER BY id ASC LIMIT (?2)"; } else { - raw_query = format!( - "SELECT id, deleted_message_id FROM {} ORDER BY id DESC LIMIT (?2)", - storage::DELETED_MESSAGES_TABLE - ); + raw_query = "SELECT id, deleted_message_id FROM deleted_messages ORDER BY id DESC LIMIT (?2)"; } - let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; + let mut query = conn.prepare(raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![from_server_id, limit], |row| { Ok(models::DeletedMessage { id: row.get(0)?, deleted_message_id: row.get(1)? }) }) { @@ -785,7 +763,7 @@ pub async fn add_moderator( let pool = storage::pool_by_room_id(&body.room_id); let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the moderator - let stmt = format!("INSERT INTO {} (public_key) VALUES (?1)", storage::MODERATORS_TABLE); + let stmt = "INSERT INTO moderators (public_key) VALUES (?1)"; match conn.execute(&stmt, params![&body.public_key]) { Ok(_) => (), Err(e) => { @@ -819,7 +797,7 @@ pub async fn delete_moderator( let pool = storage::pool_by_room_id(&body.room_id); let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the moderator - let stmt = format!("DELETE FROM {} WHERE public_key = (?1)", storage::MODERATORS_TABLE); + let stmt = "DELETE FROM moderators WHERE public_key = (?1)"; match conn.execute(&stmt, params![&body.public_key]) { Ok(_) => (), Err(e) => { @@ -868,10 +846,8 @@ pub fn ban_and_delete_all_messages( ban(public_key, auth_token, pool)?; // Get the IDs of the messages to delete let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; - let raw_query = format!( - "SELECT id FROM {} WHERE public_key = (?1) AND is_deleted = 0", - storage::MESSAGES_TABLE - ); + let raw_query = + "SELECT id FROM messages WHERE public_key = (?1) AND is_deleted = 0"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![public_key], |row| Ok(row.get(0)?)) { Ok(rows) => rows, @@ -910,7 +886,7 @@ pub fn ban( // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the message - let stmt = format!("INSERT INTO {} (public_key) VALUES (?1)", storage::BLOCK_LIST_TABLE); + let stmt = "INSERT INTO block_list (public_key) VALUES (?1)"; match conn.execute(&stmt, params![public_key]) { Ok(_) => (), Err(e) => { @@ -945,7 +921,7 @@ pub fn unban( // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the message - let stmt = format!("DELETE FROM {} WHERE public_key = (?1)", storage::BLOCK_LIST_TABLE); + let stmt = "DELETE FROM block_list WHERE public_key = (?1)"; match conn.execute(&stmt, params![public_key]) { Ok(_) => (), Err(e) => { @@ -993,7 +969,7 @@ pub fn get_member_count( // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Query the database - let raw_query = format!("SELECT COUNT(DISTINCT public_key) FROM {}", storage::TOKENS_TABLE); + let raw_query = "SELECT COUNT(DISTINCT public_key) FROM tokens"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![], |row| row.get(0)) { Ok(rows) => rows, @@ -1032,7 +1008,7 @@ pub fn compact_poll( from_deletion_server_id, } = request_body; // Check that the room hasn't been deleted - let raw_query = format!("SELECT id, name FROM {} where id = (?1)", storage::MAIN_TABLE); + let raw_query = "SELECT id, name FROM main where id = (?1)"; match main_conn.query_row(&raw_query, params![room_id], |row| { Ok(models::Room { id: row.get(0)?, name: row.get(1)? }) }) { @@ -1177,10 +1153,8 @@ pub async fn get_stats_for_room( let pool = storage::pool_by_room_id(&room); let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; - let raw_query_users = format!( - "SELECT COUNT(public_key) FROM {} WHERE last_active > ?1 AND last_active <= ?2", - storage::USER_ACTIVITY_TABLE - ); + let raw_query_users = + "SELECT COUNT(public_key) FROM user_activity WHERE last_active > ?1 AND last_active <= ?2"; let mut query_users = conn.prepare(&raw_query_users).map_err(|_| Error::DatabaseFailedInternally)?; @@ -1191,10 +1165,8 @@ pub async fn get_stats_for_room( Err(_e) => return Err(warp::reject::custom(Error::DatabaseFailedInternally)), }; - let raw_query_posts = format!( - "SELECT COUNT(id) FROM {} WHERE timestamp >= ?1 AND timestamp <= ?2", - storage::MESSAGES_TABLE - ); + let raw_query_posts = + "SELECT COUNT(id) FROM messages WHERE timestamp >= ?1 AND timestamp <= ?2"; let mut query_posts = conn.prepare(&raw_query_posts).map_err(|_| Error::DatabaseFailedInternally)?; @@ -1222,10 +1194,8 @@ fn get_pending_tokens( public_key: &str, pool: &storage::DatabaseConnectionPool, ) -> Result)>, Rejection> { let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; - let raw_query = format!( - "SELECT timestamp, token FROM {} WHERE public_key = (?1) AND timestamp > (?2)", - storage::PENDING_TOKENS_TABLE - ); + let raw_query = + "SELECT timestamp, token FROM pending_tokens WHERE public_key = (?1) AND timestamp > (?2)"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let now = chrono::Utc::now().timestamp(); let expiration = now - storage::PENDING_TOKEN_EXPIRATION; @@ -1246,7 +1216,7 @@ fn get_moderators_vector(pool: &storage::DatabaseConnectionPool) -> Result rows, @@ -1272,7 +1242,7 @@ fn get_banned_public_keys_vector( // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Query the database - let raw_query = format!("SELECT public_key FROM {}", storage::BLOCK_LIST_TABLE); + let raw_query = "SELECT public_key FROM block_list"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![], |row| row.get(0)) { Ok(rows) => rows, @@ -1289,10 +1259,8 @@ fn is_banned(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Query the database - let raw_query = format!( - "SELECT COUNT(public_key) FROM {} WHERE public_key = (?1)", - storage::BLOCK_LIST_TABLE - ); + let raw_query = + "SELECT COUNT(public_key) FROM block_list WHERE public_key = (?1)"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![public_key], |row| row.get(0)) { Ok(rows) => rows, @@ -1327,7 +1295,7 @@ fn get_public_key_for_auth_token( // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Query the database - let raw_query = format!("SELECT public_key FROM {} WHERE token = (?1)", storage::TOKENS_TABLE); + let raw_query = "SELECT public_key FROM tokens WHERE token = (?1)"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![auth_token], |row| row.get(0)) { Ok(rows) => rows, diff --git a/src/storage.rs b/src/storage.rs index 45b46b5..fd8a39e 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -14,8 +14,6 @@ pub type DatabaseConnectionPool = r2d2::Pool; // Main -pub const MAIN_TABLE: &str = "main"; - lazy_static::lazy_static! { pub static ref MAIN_POOL: DatabaseConnectionPool = { @@ -32,14 +30,12 @@ pub fn create_main_database_if_needed() { } fn create_main_tables_if_needed(conn: &DatabaseConnection) { - let main_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + let main_table_cmd = + "CREATE TABLE IF NOT EXISTS main ( id TEXT PRIMARY KEY, name TEXT, image_id TEXT - )", - MAIN_TABLE - ); + )"; conn.execute(&main_table_cmd, params![]).expect("Couldn't create main table."); } @@ -49,15 +45,6 @@ pub const PENDING_TOKEN_EXPIRATION: i64 = 10 * 60; pub const TOKEN_EXPIRATION: i64 = 7 * 24 * 60 * 60; pub const FILE_EXPIRATION: i64 = 15 * 24 * 60 * 60; -pub const MESSAGES_TABLE: &str = "messages"; -pub const DELETED_MESSAGES_TABLE: &str = "deleted_messages"; -pub const MODERATORS_TABLE: &str = "moderators"; -pub const BLOCK_LIST_TABLE: &str = "block_list"; -pub const PENDING_TOKENS_TABLE: &str = "pending_tokens"; -pub const TOKENS_TABLE: &str = "tokens"; -pub const FILES_TABLE: &str = "files"; -pub const USER_ACTIVITY_TABLE: &str = "user_activity"; - lazy_static::lazy_static! { static ref POOLS: Mutex> = Mutex::new(HashMap::new()); @@ -87,84 +74,68 @@ pub fn create_room_tables_if_needed(conn: &DatabaseConnection) { // Messages // The `id` field is needed to make `rowid` stable, which is important because otherwise // the `id`s in this table won't correspond to those in the deleted messages table - let messages_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + let messages_table_cmd = + "CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY, public_key TEXT, timestamp INTEGER, data TEXT, signature TEXT, is_deleted INTEGER - )", - MESSAGES_TABLE - ); + )"; conn.execute(&messages_table_cmd, params![]).expect("Couldn't create messages table."); // Deleted messages - let deleted_messages_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + let deleted_messages_table_cmd = + "CREATE TABLE IF NOT EXISTS deleted_messages ( id INTEGER PRIMARY KEY, deleted_message_id INTEGER - )", - DELETED_MESSAGES_TABLE - ); + )"; conn.execute(&deleted_messages_table_cmd, params![]) .expect("Couldn't create deleted messages table."); // Moderators - let moderators_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + let moderators_table_cmd = + "CREATE TABLE IF NOT EXISTS moderators ( public_key TEXT - )", - MODERATORS_TABLE - ); + )"; conn.execute(&moderators_table_cmd, params![]).expect("Couldn't create moderators table."); // Block list - let block_list_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + let block_list_table_cmd = + "CREATE TABLE IF NOT EXISTS block_list ( public_key TEXT - )", - BLOCK_LIST_TABLE - ); + )"; conn.execute(&block_list_table_cmd, params![]).expect("Couldn't create block list table."); // Pending tokens // Note that a given public key can have multiple pending tokens - let pending_tokens_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + let pending_tokens_table_cmd = + "CREATE TABLE IF NOT EXISTS pending_tokens ( public_key TEXT, timestamp INTEGER, token BLOB - )", - PENDING_TOKENS_TABLE - ); + )"; conn.execute(&pending_tokens_table_cmd, params![]) .expect("Couldn't create pending tokens table."); // Tokens // The token is stored as hex here (rather than as bytes) because it's more convenient for lookup - let tokens_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + let tokens_table_cmd = + "CREATE TABLE IF NOT EXISTS tokens ( public_key TEXT, timestamp INTEGER, token TEXT PRIMARY KEY - )", - TOKENS_TABLE - ); + )"; conn.execute(&tokens_table_cmd, params![]).expect("Couldn't create tokens table."); // Files - let files_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + let files_table_cmd = + "CREATE TABLE IF NOT EXISTS files ( id TEXT PRIMARY KEY, timestamp INTEGER - )", - FILES_TABLE - ); + )"; conn.execute(&files_table_cmd, params![]).expect("Couldn't create files table."); // User activity table - let user_activity_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + let user_activity_table_cmd = + "CREATE TABLE IF NOT EXISTS user_activity ( public_key TEXT PRIMARY KEY, last_active INTEGER NOT NULL - )", - USER_ACTIVITY_TABLE, - ); + )"; conn.execute(&user_activity_table_cmd, params![]) .expect("Couldn't create user activity table."); } @@ -213,7 +184,7 @@ async fn prune_tokens() { Ok(conn) => conn, Err(e) => return error!("Couldn't prune tokens due to error: {}.", e), }; - let stmt = format!("DELETE FROM {} WHERE timestamp < (?1)", TOKENS_TABLE); + let stmt = "DELETE FROM tokens WHERE timestamp < (?1)"; let now = chrono::Utc::now().timestamp(); let expiration = now - TOKEN_EXPIRATION; match conn.execute(&stmt, params![expiration]) { @@ -236,7 +207,7 @@ async fn prune_pending_tokens() { Ok(conn) => conn, Err(e) => return error!("Couldn't prune pending tokens due to error: {}.", e), }; - let stmt = format!("DELETE FROM {} WHERE timestamp < (?1)", PENDING_TOKENS_TABLE); + let stmt = "DELETE FROM pending_tokens WHERE timestamp < (?1)"; let now = chrono::Utc::now().timestamp(); let expiration = now - PENDING_TOKEN_EXPIRATION; match conn.execute(&stmt, params![expiration]) { @@ -257,7 +228,7 @@ fn get_expired_file_ids( error!("Couldn't get database connection to prune files due to error: {}.", e); })?; // Get the IDs of the files to delete - let raw_query = format!("SELECT id FROM {} WHERE timestamp < (?1)", FILES_TABLE); + let raw_query = "SELECT id FROM files WHERE timestamp < (?1)"; let mut query = conn.prepare(&raw_query).map_err(|e| { error!("Couldn't prepare query to prune files due to error: {}.", e); @@ -311,7 +282,7 @@ pub async fn prune_files_for_room(pool: &DatabaseConnectionPool, room: &str, fil // Remove the file records from the database // FIXME: It'd be great to do this in a single statement, but apparently this is not supported very well for id in ids { - let stmt = format!("DELETE FROM {} WHERE id = (?1)", FILES_TABLE); + let stmt = "DELETE FROM files WHERE id = (?1)"; match conn.execute(&stmt, params![id]) { Ok(_) => (), Err(e) => { @@ -355,14 +326,12 @@ pub fn perform_migration() { return error!("Couldn't get all room IDs."); } }; - let create_tokens_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + let create_tokens_table_cmd = + "CREATE TABLE IF NOT EXISTS tokens ( public_key TEXT, timestamp INTEGER, token TEXT PRIMARY KEY - )", - TOKENS_TABLE - ); + )"; let migrations = Migrations::new(vec![M::up("DROP TABLE tokens"), M::up(&create_tokens_table_cmd)]); for room in rooms { @@ -379,7 +348,7 @@ fn get_all_room_ids() -> Result, Error> { // Get a database connection let conn = MAIN_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?; // Query the database - let raw_query = format!("SELECT id FROM {}", MAIN_TABLE); + let raw_query = "SELECT id FROM main"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![], |row| row.get(0)) { Ok(rows) => rows, diff --git a/src/tests.rs b/src/tests.rs index 09ecd4c..d623079 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -89,7 +89,7 @@ async fn test_file_handling() { .unwrap(); // Check that there's a file record let conn = pool.get().unwrap(); - let raw_query = format!("SELECT id FROM {}", storage::FILES_TABLE); + let raw_query = "SELECT id FROM files"; let id_as_string: String = conn.query_row(&raw_query, params![], |row| Ok(row.get(0)?)).unwrap(); let id = id_as_string.parse::().unwrap(); @@ -107,7 +107,7 @@ async fn test_file_handling() { fs::read(format!("files/{}_files/{}", test_room_id, id)).unwrap_err(); // Check that the file record is also gone let conn = pool.get().unwrap(); - let raw_query = format!("SELECT id FROM {}", storage::FILES_TABLE); + let raw_query = "SELECT id FROM files"; let result: Result = conn.query_row(&raw_query, params![], |row| Ok(row.get(0)?)); // It should be gone now result.unwrap_err(); From 3d99ba221a0c6e820266fad479e48fc07ad14877 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 7 Sep 2021 14:38:32 -0300 Subject: [PATCH 6/9] Rust version compatibility Because it should still work even if your rustc is more than 5 minutes old. --- src/handlers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers.rs b/src/handlers.rs index 561bd1e..fdb3cf0 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -136,7 +136,7 @@ pub async fn store_file( pool: &storage::DatabaseConnectionPool, ) -> Result { // It'd be nice to use the UUID crate for the file ID, but clients want an integer ID - const UPPER_BOUND: u64 = 2u64.pow(53); // JS has trouble if we go higher than this + const UPPER_BOUND: u64 = 1u64 << 53; // JS has trouble if we go higher than this let id: u64 = thread_rng().gen_range(0..UPPER_BOUND); let now = chrono::Utc::now().timestamp(); // Check authorization level if needed From dd02092519580de13e419bcd08c02477e0df9345 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Wed, 8 Sep 2021 22:25:06 -0300 Subject: [PATCH 7/9] Reformat --- src/handlers.rs | 21 ++++++++------------- src/storage.rs | 30 ++++++++++-------------------- 2 files changed, 18 insertions(+), 33 deletions(-) diff --git a/src/handlers.rs b/src/handlers.rs index fdb3cf0..3518723 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -361,8 +361,7 @@ pub fn get_auth_token_challenge( // Note that a given public key can have multiple pending tokens let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; let now = chrono::Utc::now().timestamp(); - let stmt = - "INSERT INTO pending_tokens (public_key, timestamp, token) VALUES (?1, ?2, ?3)"; + let stmt = "INSERT INTO pending_tokens (public_key, timestamp, token) VALUES (?1, ?2, ?3)"; let _ = match conn.execute(&stmt, params![hex_public_key, now, token]) { Ok(rows) => rows, Err(e) => { @@ -404,8 +403,7 @@ pub fn claim_auth_token( .ok_or(Error::Unauthorized)?; let token = &pending_tokens[index].1; // Store the claimed token - let stmt = - "INSERT INTO tokens (public_key, timestamp, token) VALUES (?1, ?2, ?3)"; + let stmt = "INSERT INTO tokens (public_key, timestamp, token) VALUES (?1, ?2, ?3)"; let now = chrono::Utc::now().timestamp(); match conn.execute(&stmt, params![public_key, now, hex::encode(token)]) { Ok(_) => (), @@ -606,8 +604,7 @@ fn update_usage_statistics( let public_key = get_public_key_for_auth_token(auth_token, pool)?; let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; let now = chrono::Utc::now().timestamp(); - let stmt = - "INSERT OR REPLACE INTO user_activity (public_key, last_active) VALUES(?1, ?2)"; + let stmt = "INSERT OR REPLACE INTO user_activity (public_key, last_active) VALUES(?1, ?2)"; conn.execute(&stmt, params![public_key, now]).map_err(|_| Error::DatabaseFailedInternally)?; return Ok(()); } @@ -676,8 +673,7 @@ pub fn delete_message( }; // Update the deletions table if needed if count > 0 { - let stmt = - "INSERT INTO deleted_messages (deleted_message_id) VALUES (?1)"; + let stmt = "INSERT INTO deleted_messages (deleted_message_id) VALUES (?1)"; match tx.execute(&stmt, params![id]) { Ok(_) => (), Err(e) => { @@ -723,7 +719,8 @@ pub fn get_deleted_messages( if query_params.get("from_server_id").is_some() { raw_query = "SELECT id, deleted_message_id FROM deleted_messages WHERE id > (?1) ORDER BY id ASC LIMIT (?2)"; } else { - raw_query = "SELECT id, deleted_message_id FROM deleted_messages ORDER BY id DESC LIMIT (?2)"; + raw_query = + "SELECT id, deleted_message_id FROM deleted_messages ORDER BY id DESC LIMIT (?2)"; } let mut query = conn.prepare(raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![from_server_id, limit], |row| { @@ -846,8 +843,7 @@ pub fn ban_and_delete_all_messages( ban(public_key, auth_token, pool)?; // Get the IDs of the messages to delete let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; - let raw_query = - "SELECT id FROM messages WHERE public_key = (?1) AND is_deleted = 0"; + let raw_query = "SELECT id FROM messages WHERE public_key = (?1) AND is_deleted = 0"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![public_key], |row| Ok(row.get(0)?)) { Ok(rows) => rows, @@ -1259,8 +1255,7 @@ fn is_banned(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Query the database - let raw_query = - "SELECT COUNT(public_key) FROM block_list WHERE public_key = (?1)"; + let raw_query = "SELECT COUNT(public_key) FROM block_list WHERE public_key = (?1)"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let rows = match query.query_map(params![public_key], |row| row.get(0)) { Ok(rows) => rows, diff --git a/src/storage.rs b/src/storage.rs index fd8a39e..ae2adf0 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -30,8 +30,7 @@ pub fn create_main_database_if_needed() { } fn create_main_tables_if_needed(conn: &DatabaseConnection) { - let main_table_cmd = - "CREATE TABLE IF NOT EXISTS main ( + let main_table_cmd = "CREATE TABLE IF NOT EXISTS main ( id TEXT PRIMARY KEY, name TEXT, image_id TEXT @@ -74,8 +73,7 @@ pub fn create_room_tables_if_needed(conn: &DatabaseConnection) { // Messages // The `id` field is needed to make `rowid` stable, which is important because otherwise // the `id`s in this table won't correspond to those in the deleted messages table - let messages_table_cmd = - "CREATE TABLE IF NOT EXISTS messages ( + let messages_table_cmd = "CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY, public_key TEXT, timestamp INTEGER, @@ -85,29 +83,25 @@ pub fn create_room_tables_if_needed(conn: &DatabaseConnection) { )"; conn.execute(&messages_table_cmd, params![]).expect("Couldn't create messages table."); // Deleted messages - let deleted_messages_table_cmd = - "CREATE TABLE IF NOT EXISTS deleted_messages ( + let deleted_messages_table_cmd = "CREATE TABLE IF NOT EXISTS deleted_messages ( id INTEGER PRIMARY KEY, deleted_message_id INTEGER )"; conn.execute(&deleted_messages_table_cmd, params![]) .expect("Couldn't create deleted messages table."); // Moderators - let moderators_table_cmd = - "CREATE TABLE IF NOT EXISTS moderators ( + let moderators_table_cmd = "CREATE TABLE IF NOT EXISTS moderators ( public_key TEXT )"; conn.execute(&moderators_table_cmd, params![]).expect("Couldn't create moderators table."); // Block list - let block_list_table_cmd = - "CREATE TABLE IF NOT EXISTS block_list ( + let block_list_table_cmd = "CREATE TABLE IF NOT EXISTS block_list ( public_key TEXT )"; conn.execute(&block_list_table_cmd, params![]).expect("Couldn't create block list table."); // Pending tokens // Note that a given public key can have multiple pending tokens - let pending_tokens_table_cmd = - "CREATE TABLE IF NOT EXISTS pending_tokens ( + let pending_tokens_table_cmd = "CREATE TABLE IF NOT EXISTS pending_tokens ( public_key TEXT, timestamp INTEGER, token BLOB @@ -116,23 +110,20 @@ pub fn create_room_tables_if_needed(conn: &DatabaseConnection) { .expect("Couldn't create pending tokens table."); // Tokens // The token is stored as hex here (rather than as bytes) because it's more convenient for lookup - let tokens_table_cmd = - "CREATE TABLE IF NOT EXISTS tokens ( + let tokens_table_cmd = "CREATE TABLE IF NOT EXISTS tokens ( public_key TEXT, timestamp INTEGER, token TEXT PRIMARY KEY )"; conn.execute(&tokens_table_cmd, params![]).expect("Couldn't create tokens table."); // Files - let files_table_cmd = - "CREATE TABLE IF NOT EXISTS files ( + let files_table_cmd = "CREATE TABLE IF NOT EXISTS files ( id TEXT PRIMARY KEY, timestamp INTEGER )"; conn.execute(&files_table_cmd, params![]).expect("Couldn't create files table."); // User activity table - let user_activity_table_cmd = - "CREATE TABLE IF NOT EXISTS user_activity ( + let user_activity_table_cmd = "CREATE TABLE IF NOT EXISTS user_activity ( public_key TEXT PRIMARY KEY, last_active INTEGER NOT NULL )"; @@ -326,8 +317,7 @@ pub fn perform_migration() { return error!("Couldn't get all room IDs."); } }; - let create_tokens_table_cmd = - "CREATE TABLE IF NOT EXISTS tokens ( + let create_tokens_table_cmd = "CREATE TABLE IF NOT EXISTS tokens ( public_key TEXT, timestamp INTEGER, token TEXT PRIMARY KEY From 1523699dd1e265bb9644c3f8f4ae51333a6929fb Mon Sep 17 00:00:00 2001 From: Sean Darcy Date: Tue, 7 Sep 2021 17:19:19 +1000 Subject: [PATCH 8/9] Replaces room_id variables with typed versions Allows for better protection of the input strings and fits into type checking. --- Cargo.lock | 3 +++ Cargo.toml | 1 + src/handlers.rs | 28 ++++++++++++++++------- src/rpc.rs | 18 ++++++--------- src/storage.rs | 59 ++++++++++++++++++++++++++++++++++++++----------- src/tests.rs | 22 ++++++++++-------- 6 files changed, 90 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e6cc5a..015dc65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "addr2line" version = "0.14.1" @@ -1764,6 +1766,7 @@ dependencies = [ "r2d2_sqlite", "rand 0.8.3", "rand_core 0.5.1", + "regex", "reqwest", "rusqlite", "rusqlite_migration", diff --git a/Cargo.toml b/Cargo.toml index dc3f3a5..dbe2e1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ log4rs = "1.0" octocrab = "0.9" rand = "0.8" rand_core = "0.5" +regex = "1" reqwest = { version = "0.11", features = ["json"] } rusqlite = { version = "0.24", features = ["bundled"] } rusqlite_migration = "0.4" diff --git a/src/handlers.rs b/src/handlers.rs index d30bddb..6a2d874 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -52,7 +52,9 @@ pub async fn create_room(room: models::Room) -> Result { } } // Set up the database - storage::create_database_if_needed(&room.id); + storage::create_database_if_needed( + &storage::RoomId::new(&room.id).ok_or(Error::ValidationFailed)?, + ); // Return info!("Added room with ID: {}", &room.id); let json = models::StatusCode { status_code: StatusCode::OK.as_u16() }; @@ -768,7 +770,8 @@ pub fn get_deleted_messages( pub async fn add_moderator_public( body: models::ChangeModeratorRequestBody, auth_token: &str, ) -> Result { - let pool = storage::pool_by_room_id(&body.room_id); + let room_id = storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?; + let pool = storage::pool_by_room_id(&room_id); let (has_authorization_level, _) = has_authorization_level(auth_token, AuthorizationLevel::Moderator, &pool)?; if !has_authorization_level { @@ -782,7 +785,9 @@ pub async fn add_moderator( body: models::ChangeModeratorRequestBody, ) -> Result { // Get a database connection - let pool = storage::pool_by_room_id(&body.room_id); + let pool = storage::pool_by_room_id( + &storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?, + ); let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the moderator let stmt = format!("INSERT INTO {} (public_key) VALUES (?1)", storage::MODERATORS_TABLE); @@ -802,7 +807,9 @@ pub async fn add_moderator( pub async fn delete_moderator_public( body: models::ChangeModeratorRequestBody, auth_token: &str, ) -> Result { - let pool = storage::pool_by_room_id(&body.room_id); + let pool = storage::pool_by_room_id( + &storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?, + ); let (has_authorization_level, _) = has_authorization_level(auth_token, AuthorizationLevel::Moderator, &pool)?; if !has_authorization_level { @@ -816,7 +823,9 @@ pub async fn delete_moderator( body: models::ChangeModeratorRequestBody, ) -> Result { // Get a database connection - let pool = storage::pool_by_room_id(&body.room_id); + let pool = storage::pool_by_room_id( + &storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?, + ); let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the moderator let stmt = format!("DELETE FROM {} WHERE public_key = (?1)", storage::MODERATORS_TABLE); @@ -1051,7 +1060,9 @@ pub fn compact_poll( } }; // Get the database connection pool - let pool = storage::pool_by_room_id(&room_id); + let pool = storage::pool_by_room_id( + &storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?, + ); // Get the new messages let mut get_messages_query_params: HashMap = HashMap::new(); if let Some(from_message_server_id) = from_message_server_id { @@ -1160,7 +1171,7 @@ pub async fn get_session_version(platform: &str) -> Result { // not publicly exposed. pub async fn get_stats_for_room( - room: String, query_map: HashMap, + room_id: String, query_map: HashMap, ) -> Result { let now = chrono::Utc::now().timestamp(); let window = match query_map.get("window") { @@ -1174,7 +1185,8 @@ pub async fn get_stats_for_room( }; let lowerbound = upperbound - window; - let pool = storage::pool_by_room_id(&room); + let pool = + storage::pool_by_room_id(&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?); let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; let raw_query_users = format!( diff --git a/src/rpc.rs b/src/rpc.rs index 9e36213..196a22b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -49,13 +49,13 @@ pub async fn handle_rpc_call(rpc_call: RpcCall) -> Result { // Get the auth token if possible let auth_token = get_auth_token(&rpc_call); // Get the room ID - let room_id = get_room_id(&rpc_call); + let room_id_str = get_room_id(&rpc_call); // Switch on the HTTP method match rpc_call.method.as_ref() { "GET" => { - return handle_get_request(room_id, rpc_call, &path, auth_token, query_params).await + return handle_get_request(room_id_str, rpc_call, &path, auth_token, query_params).await } - "POST" => return handle_post_request(room_id, rpc_call, &path, auth_token).await, + "POST" => return handle_post_request(room_id_str, rpc_call, &path, auth_token).await, "DELETE" => { let pool = get_pool_for_room(&rpc_call)?; return handle_delete_request(rpc_call, &path, auth_token, &pool).await; @@ -408,14 +408,10 @@ async fn handle_delete_request( // Utilities fn get_pool_for_room(rpc_call: &RpcCall) -> Result { - let room_id = match get_room_id(&rpc_call) { - Some(room_id) => room_id, - None => { - warn!("Missing room ID."); - return Err(warp::reject::custom(Error::InvalidRpcCall)); - } - }; - return Ok(storage::pool_by_room_id(&room_id)); + let room_id = get_room_id(&rpc_call).ok_or(Error::ValidationFailed)?; + return Ok(storage::pool_by_room_id( + &storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?, + )); } fn get_auth_token(rpc_call: &RpcCall) -> Option { diff --git a/src/storage.rs b/src/storage.rs index 45b46b5..31edc39 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,3 +1,4 @@ +use regex::Regex; use std::collections::HashMap; use std::path::Path; use std::sync::Mutex; @@ -12,6 +13,30 @@ use super::errors::Error; pub type DatabaseConnection = r2d2::PooledConnection; pub type DatabaseConnectionPool = r2d2::Pool; +#[derive(PartialEq, Eq, Hash)] +pub struct RoomId { + id: String, +} + +lazy_static::lazy_static! { + // Alphanumeric, Decimals "-" & "_" only and must be between 1 - 64 characters + static ref REGULAR_CHARACTERS_ONLY: Regex = Regex::new(r"^[\w-]{1,64}$").unwrap(); +} + +impl RoomId { + pub fn new(room_id: &str) -> Option { + if REGULAR_CHARACTERS_ONLY.is_match(room_id) { + return Some(RoomId { id: room_id.to_string() }); + } else { + return None; + } + } + + pub fn get_id(&self) -> &str { + &self.id + } +} + // Main pub const MAIN_TABLE: &str = "main"; @@ -63,21 +88,21 @@ lazy_static::lazy_static! { static ref POOLS: Mutex> = Mutex::new(HashMap::new()); } -pub fn pool_by_room_id(room_id: &str) -> DatabaseConnectionPool { +pub fn pool_by_room_id(room_id: &RoomId) -> DatabaseConnectionPool { let mut pools = POOLS.lock().unwrap(); - if let Some(pool) = pools.get(room_id) { + if let Some(pool) = pools.get(room_id.get_id()) { return pool.clone(); } else { - let raw_path = format!("rooms/{}.db", room_id); + let raw_path = format!("rooms/{}.db", room_id.get_id()); let path = Path::new(&raw_path); let db_manager = r2d2_sqlite::SqliteConnectionManager::file(path); let pool = r2d2::Pool::new(db_manager).unwrap(); - pools.insert(room_id.to_string(), pool); - return pools[room_id].clone(); + pools.insert(room_id.get_id().to_string(), pool); + return pools[room_id.get_id()].clone(); } } -pub fn create_database_if_needed(room_id: &str) { +pub fn create_database_if_needed(room_id: &RoomId) { let pool = pool_by_room_id(room_id); let conn = pool.get().unwrap(); create_room_tables_if_needed(&conn); @@ -270,7 +295,9 @@ fn get_expired_file_ids( Ok(rows.filter_map(|result| result.ok()).collect()) } -pub async fn prune_files_for_room(pool: &DatabaseConnectionPool, room: &str, file_expiration: i64) { +pub async fn prune_files_for_room( + pool: &DatabaseConnectionPool, room: &RoomId, file_expiration: i64, +) { let ids = get_expired_file_ids(&pool, file_expiration); match ids { @@ -278,7 +305,7 @@ pub async fn prune_files_for_room(pool: &DatabaseConnectionPool, room: &str, fil // Delete the files let futs = ids.iter().map(|id| async move { ( - tokio::fs::remove_file(format!("files/{}_files/{}", room, id)).await, + tokio::fs::remove_file(format!("files/{}_files/{}", room.get_id(), id)).await, id.to_owned(), ) }); @@ -289,7 +316,9 @@ pub async fn prune_files_for_room(pool: &DatabaseConnectionPool, room: &str, fil if let Err(err) = res { error!( "Couldn't delete file: {} from room: {} due to error: {}.", - id, room, err + id, + room.get_id(), + err ); } } @@ -320,7 +349,7 @@ pub async fn prune_files_for_room(pool: &DatabaseConnectionPool, room: &str, fil }; } // Log the result - info!("Pruned files for room: {}. Took: {:?}", room, now.elapsed()); + info!("Pruned files for room: {}. Took: {:?}", room.get_id(), now.elapsed()); } Ok(_) => { // empty @@ -375,7 +404,7 @@ pub fn perform_migration() { // Utilities -fn get_all_room_ids() -> Result, Error> { +fn get_all_room_ids() -> Result, Error> { // Get a database connection let conn = MAIN_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?; // Query the database @@ -388,7 +417,11 @@ fn get_all_room_ids() -> Result, Error> { return Err(Error::DatabaseFailedInternally); } }; - let ids: Vec = rows.filter_map(|result| result.ok()).collect(); + let room_ids: Vec<_> = rows + .filter_map(|result: Result| result.ok()) + .map(|opt| RoomId::new(&opt)) + .flatten() + .collect(); // Return - return Ok(ids); + return Ok(room_ids); } diff --git a/src/tests.rs b/src/tests.rs index 09ecd4c..0a3fbad 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -75,12 +75,12 @@ async fn test_file_handling() { // Ensure the test room is set up and get a database connection pool let pool = set_up_test_room().await; - let test_room_id = "test_room"; + let test_room_id = storage::RoomId::new("test_room").unwrap(); // Get an auth token let (auth_token, _) = get_auth_token(&pool); // Store the test file handlers::store_file( - Some(test_room_id.to_string()), + Some(test_room_id.get_id().to_string()), TEST_FILE, Some(auth_token.clone()), &pool, @@ -94,17 +94,21 @@ async fn test_file_handling() { conn.query_row(&raw_query, params![], |row| Ok(row.get(0)?)).unwrap(); let id = id_as_string.parse::().unwrap(); // Retrieve the file and check the content - let base64_encoded_file = - handlers::get_file(Some(test_room_id.to_string()), id, Some(auth_token.clone()), &pool) - .await - .unwrap() - .result; + let base64_encoded_file = handlers::get_file( + Some(test_room_id.get_id().to_string()), + id, + Some(auth_token.clone()), + &pool, + ) + .await + .unwrap() + .result; assert_eq!(base64_encoded_file, TEST_FILE); // Prune the file and check that it's gone // Will evaluate to now + 60 - storage::prune_files_for_room(&pool, test_room_id, -60).await; + storage::prune_files_for_room(&pool, &test_room_id, -60).await; // It should be gone now - fs::read(format!("files/{}_files/{}", test_room_id, id)).unwrap_err(); + fs::read(format!("files/{}_files/{}", test_room_id.get_id(), id)).unwrap_err(); // Check that the file record is also gone let conn = pool.get().unwrap(); let raw_query = format!("SELECT id FROM {}", storage::FILES_TABLE); From 6538d558f3c738bede9f8844ce2b36a474dae62d Mon Sep 17 00:00:00 2001 From: Sean Darcy Date: Tue, 14 Sep 2021 14:07:05 +1000 Subject: [PATCH 9/9] Bump version to 0.1.9 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index dbe2e1a..1e9d415 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "session-open-group-server" -version = "0.1.8" +version = "0.1.9" authors = ["Niels Andriesse "] edition = "2018" description = "The Session open group server. Use this to run a custom open group."