Skip to content
This repository has been archived by the owner on Dec 22, 2021. It is now read-only.

Adds a pinned message to the rooms table #21

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 127 additions & 11 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use warp::{http::StatusCode, reply::Reply, reply::Response, Rejection};
use super::crypto;
use super::errors::Error;
use super::models;
use super::models::{OldMessage, Room, User};
use super::models::{OldMessage, PinnedMessage, Room, User};
use super::rpc;
use super::storage::{self, db_error};

Expand Down Expand Up @@ -620,13 +620,24 @@ pub fn get_auth_token_challenge(public_key: &str) -> Result<models::Challenge, R

/// Inserts a message into the database.
pub fn insert_message(
room: Room,
user: User,
room: &Room,
user: &User,
data: &[u8],
signature: &[u8],
) -> Result<Response, Rejection> {
let mut conn = storage::get_conn()?;
let tx = storage::get_transaction(&mut conn)?;
let message = insert_message_impl(&mut conn, room, user, data, signature)?;
let response = json!({ "status_code": StatusCode::OK.as_u16(), "message": message });
Ok(warp::reply::json(&response).into_response())
}
pub fn insert_message_impl(
conn: &mut PooledConnection<SqliteConnectionManager>,
room: &Room,
user: &User,
data: &[u8],
signature: &[u8],
) -> Result<OldMessage, Rejection> {
let tx = storage::get_transaction(conn)?;
require_authorization(
&tx,
&user,
Expand Down Expand Up @@ -678,8 +689,7 @@ pub fn insert_message(
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
}

let response = json!({ "status_code": StatusCode::OK.as_u16(), "message": message });
Ok(warp::reply::json(&response).into_response())
Ok(message)
}

// TODO FIXME: The paging mechanism here is really odd: you can either get the last 256 messages,
Expand Down Expand Up @@ -865,7 +875,8 @@ pub fn add_moderator_public(
&room,
AuthorizationRequired { admin: true, ..Default::default() },
)?;
add_moderator_impl(session_id, admin, room)
let mut conn = storage::get_conn()?;
add_moderator_impl(&mut conn, session_id, admin, &room)
}

// TODO: need ability to add *global* server moderators/admins (which, of course, can only be done
Expand All @@ -875,22 +886,24 @@ pub fn add_moderator_public(
pub async fn add_moderator(
body: models::ChangeModeratorRequestBody,
) -> Result<Response, Rejection> {
let mut conn = storage::get_conn()?;
add_moderator_impl(
&mut conn,
&body.session_id,
body.admin.unwrap_or(false),
storage::get_room_from_token(&*storage::get_conn()?, &body.room_token)?,
&storage::get_room_from_token(&*storage::get_conn()?, &body.room_token)?,
)
}

pub fn add_moderator_impl(
conn: &mut PooledConnection<SqliteConnectionManager>,
session_id: &str,
admin: bool,
room: Room,
room: &Room,
) -> Result<Response, Rejection> {
require_session_id(session_id)?;

let mut conn = storage::get_conn()?;
let tx = storage::get_transaction(&mut conn)?;
let tx = storage::get_transaction(conn)?;

if let Err(e) = tx
.prepare_cached("INSERT OR IGNORE INTO users (session_id) VALUES (?)")
Expand Down Expand Up @@ -1192,6 +1205,109 @@ pub fn get_banned_public_keys(user: &User, room: &Room) -> Result<Response, Reje
.into_response())
}

/// Pins a single message id for the room, deletes all previous pinned messages. Must be called by
/// a moderator
pub fn pin_message(id: i64, user: &User, room: &Room) -> Result<Response, Rejection> {
// Get a connection
let mut conn = storage::get_conn()?;
pin_message_impl(&mut conn, id, &user, &room)?;
let json = models::StatusCode { status_code: StatusCode::OK.as_u16() };
return Ok(warp::reply::json(&json).into_response());
}

pub fn pin_message_impl(
conn: &mut PooledConnection<SqliteConnectionManager>,
message: i64,
user: &User,
room: &Room,
) -> Result<PinnedMessage, Rejection> {
// Check authorization level
require_authorization(
&conn,
user,
room,
AuthorizationRequired { moderator: true, ..Default::default() },
)?;
let _ = delete_pinned_message_impl(conn, user, room)?;

let tx = conn.transaction().map_err(|_| Error::DatabaseFailedInternally)?;
let pinned_message = match tx
.prepare_cached("INSERT into pinned_messages (room, message) VALUES (?1, ?2) returning *")
.map_err(db_error)?
.query_row(params![room.id, message], PinnedMessage::from_row)
{
Ok(m) => m,
Err(e) => {
error!("Couldn't pin message: {}.", e);
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
}
};
tx.commit().map_err(|_| Error::DatabaseFailedInternally)?;

Ok(pinned_message)
}

pub fn get_pinned_message(user: &User, room: &Room) -> Result<PinnedMessage, Rejection> {
// Get a connection
let mut conn = storage::get_conn()?;
get_pinned_message_impl(&mut conn, &user, &room)
}
pub fn get_pinned_message_impl(
conn: &PooledConnection<SqliteConnectionManager>,
user: &User,
room: &Room,
) -> Result<PinnedMessage, Rejection> {
// Check authorization level
require_authorization(
&conn,
user,
room,
AuthorizationRequired { moderator: false, ..Default::default() },
)?;
// Query the database
let raw_query = format!("SELECT * FROM pinned_messages ORDER BY updated DESC LIMIT 1");
let pinned_message = conn
.prepare_cached(&raw_query)
.map_err(db_error)?
.query_row(params![], PinnedMessage::from_row)
.map_err(db_error)?;

return Ok(pinned_message);
}

/// Deletes all pinned messages of a room, must be called by a moderator
pub fn delete_pinned_message(user: &User, room: &Room) -> Result<Response, Rejection> {
// Get a connection
let mut conn = storage::get_conn()?;
return delete_pinned_message_impl(&mut conn, &user, &room);
}

pub fn delete_pinned_message_impl(
conn: &mut PooledConnection<SqliteConnectionManager>,
user: &User,
room: &Room,
) -> Result<Response, Rejection> {
// Check authorization level
require_authorization(
&conn,
user,
room,
AuthorizationRequired { moderator: true, ..Default::default() },
)?;
let tx = conn.transaction().map_err(|_| Error::DatabaseFailedInternally)?;
// Delete the message if it's present
let stmt = format!("DELETE from pinned_messages WHERE room = (?1)");
if let Err(err) = tx.execute(&stmt, [room.id]) {
error!("Couldn't delete pinned message due to error: {}.", err);
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
};
// Commit
tx.commit().map_err(|_| Error::DatabaseFailedInternally)?;
// Return
let json = models::StatusCode { status_code: StatusCode::OK.as_u16() };
return Ok(warp::reply::json(&json).into_response());
}

// General

/// Returns members who have accessed the given room at least once in the past 7 days.
Expand Down
19 changes: 18 additions & 1 deletion src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,24 @@ pub struct DeletedMessage {
pub deleted_message_id: i64,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct PinnedMessage {
pub room_id: i64,
pub message: i64,
pub timestamp: i64,
}

impl PinnedMessage {
pub fn from_row(row: &rusqlite::Row) -> Result<PinnedMessage, rusqlite::Error> {
return Ok(PinnedMessage {
room_id: row.get(row.column_index("room")?)?,
message: row.get(row.column_index("message")?)?,
timestamp: row.get(row.column_index("updated")?)?,
});
}
}

#[derive(Debug, Deserialize, Serialize)]
pub struct Room {
#[serde(skip)]
pub id: i64,
Expand Down
48 changes: 45 additions & 3 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use std::collections::HashMap;

use ed25519_dalek;
use log::warn;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_json::json;
use warp::{http::StatusCode, reply::Reply, reply::Response, Rejection};

use super::crypto;
use super::errors::Error;
use super::handlers;
use super::models::{self, Room, User};
use super::models::{self, PinnedMessage, Room, User};
use super::storage;

#[allow(dead_code)]
Expand Down Expand Up @@ -264,6 +264,21 @@ async fn handle_get_request(
.into_response());
// FIXME: can drop `.into_response()` I think?
}
"pinned_message" => {
reject_if_file_server_mode(path)?;
let pinned_message = match handlers::get_pinned_message(&user, &room) {
Ok(pinned_message) => Some(pinned_message),
_ => None,
};
#[derive(Debug, Deserialize, Serialize)]
struct Response {
status_code: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pinned_message: Option<PinnedMessage>,
}
let response = Response { status_code: StatusCode::OK.as_u16(), pinned_message };
return Ok(warp::reply::json(&response).into_response());
}
"deleted_messages" => {
reject_if_file_server_mode(path)?;
let deletions = handlers::get_deleted_messages(query_params, user, room)?;
Expand Down Expand Up @@ -398,7 +413,7 @@ async fn handle_post_request(
return Err(Error::InvalidRpcCall.into());
}
};
return handlers::insert_message(room, user, &message.data, &message.signature);
return handlers::insert_message(&room, &user, &message.data, &message.signature);
}

"files" => {
Expand Down Expand Up @@ -500,6 +515,21 @@ async fn handle_post_request(
};
return handlers::delete_messages(json.ids, &user, &room);
}
"pin_message" => {
reject_if_file_server_mode(path)?;
#[derive(Debug, Deserialize)]
struct JSON {
id: i64,
}
let json: JSON = match serde_json::from_str(&rpc_call.body) {
Ok(json) => json,
Err(e) => {
warn!("Couldn't parse JSON from: {} due to error: {}.", rpc_call.body, e);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
};
return handlers::pin_message(json.id, &user, &room);
}
_ => {
warn!("Ignoring RPC call with invalid or unused endpoint: {}.", path);
return Err(Error::InvalidRpcCall.into());
Expand Down Expand Up @@ -572,6 +602,18 @@ async fn handle_delete_request(
};
return handlers::delete_moderator_public(&session_id, user, room);
}
// DELETE /pinned_message
if path == "pinned_message" {
reject_if_file_server_mode(path)?;
darcys22 marked this conversation as resolved.
Show resolved Hide resolved
let room = match get_room(&rpc_call)? {
Some(room) => room,
None => {
warn!("Missing room ID.");
return Err(Error::InvalidRpcCall.into());
}
};
return handlers::delete_pinned_message(&user, &room);
}
// Unrecognized endpoint
warn!("Ignoring RPC call with invalid or unused endpoint: {}.", path);
return Err(Error::InvalidRpcCall.into());
Expand Down
4 changes: 2 additions & 2 deletions src/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ END;

CREATE TABLE pinned_messages (
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
message INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
message INTEGER NOT NULL REFERENCES messages(id) ON DELETE CASCADE,
updated INTEGER NOT NULL DEFAULT 0, /* set to the room's `info_updated` counter when pinned (used for ordering). */
PRIMARY KEY(room, message)
);
Expand Down Expand Up @@ -262,7 +262,7 @@ CREATE TRIGGER room_metadata_pinned_add AFTER INSERT ON pinned_messages
FOR EACH ROW
BEGIN
UPDATE rooms SET info_updates = info_updates + 1 WHERE id = NEW.room;
UPDATE pinned_messages SET updated = (SELECT info_updates FROM rooms WHERE id = NEW.room) WHERE id = NEW.id;
UPDATE pinned_messages SET updated = (SELECT info_updates FROM rooms WHERE id = NEW.room) WHERE message = NEW.message;
END;
CREATE TRIGGER room_metadata_pinned_remove AFTER DELETE ON pinned_messages
FOR EACH ROW
Expand Down
Loading