diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs index e2e0795a6..f6200793c 100644 --- a/message-queue/src/main.rs +++ b/message-queue/src/main.rs @@ -1,157 +1,149 @@ -#[cfg(feature = "binaries")] mod messages; -#[cfg(feature = "binaries")] mod queue; -#[cfg(feature = "binaries")] -mod binaries { - pub(crate) use std::{ - sync::{Arc, RwLock}, - collections::HashMap, - }; +pub(crate) use std::{ + sync::{Arc, RwLock}, + collections::HashMap, +}; - pub(crate) use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; - pub(crate) use schnorr_signatures::SchnorrSignature; +pub(crate) use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; +pub(crate) use schnorr_signatures::SchnorrSignature; - pub(crate) use serai_primitives::NetworkId; +pub(crate) use serai_primitives::NetworkId; - pub(crate) use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::TcpListener, - }; +pub(crate) use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpListener, +}; - use serai_db::{Get, DbTxn, Db as DbTrait}; +use serai_db::{Get, DbTxn, Db as DbTrait}; - pub(crate) use crate::messages::*; +pub(crate) use crate::messages::*; - pub(crate) use crate::queue::Queue; +pub(crate) use crate::queue::Queue; - #[cfg(all(feature = "redb", not(feature = "rocksdb")))] - pub(crate) type Db = Arc; - #[cfg(feature = "rocksdb")] - pub(crate) type Db = serai_db::RocksDB; +#[cfg(all(feature = "redb", not(feature = "rocksdb")))] +pub(crate) type Db = Arc; +#[cfg(feature = "rocksdb")] +pub(crate) type Db = serai_db::RocksDB; - #[allow(clippy::type_complexity)] - mod clippy { - use super::*; - use once_cell::sync::Lazy; - pub(crate) static KEYS: Lazy::G>>>> = - Lazy::new(|| Arc::new(RwLock::new(HashMap::new()))); - pub(crate) static QUEUES: Lazy>>>>> = - Lazy::new(|| Arc::new(RwLock::new(HashMap::new()))); +#[allow(clippy::type_complexity)] +mod clippy { + use super::*; + use once_cell::sync::Lazy; + pub(crate) static KEYS: Lazy::G>>>> = + Lazy::new(|| Arc::new(RwLock::new(HashMap::new()))); + pub(crate) static QUEUES: Lazy>>>>> = + Lazy::new(|| Arc::new(RwLock::new(HashMap::new()))); +} +pub(crate) use self::clippy::*; + +// queue RPC method +/* + Queues a message to be delivered from a processor to a coordinator, or vice versa. + + Messages are authenticated to be coming from the claimed service. Recipient services SHOULD + independently verify signatures. + + The metadata specifies an intent. Only one message, for a specified intent, will be delivered. + This allows services to safely send messages multiple times without them being delivered + multiple times. + + The message will be ordered by this service, with the order having no guarantees other than + successful ordering by the time this call returns. +*/ +pub(crate) fn queue_message( + db: &mut Db, + meta: Metadata, + msg: Vec, + sig: SchnorrSignature, +) { + { + let from = (*KEYS).read().unwrap()[&meta.from]; + assert!( + sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R)) + ); } - pub(crate) use self::clippy::*; - - // queue RPC method - /* - Queues a message to be delivered from a processor to a coordinator, or vice versa. - - Messages are authenticated to be coming from the claimed service. Recipient services SHOULD - independently verify signatures. - - The metadata specifies an intent. Only one message, for a specified intent, will be delivered. - This allows services to safely send messages multiple times without them being delivered - multiple times. - - The message will be ordered by this service, with the order having no guarantees other than - successful ordering by the time this call returns. - */ - pub(crate) fn queue_message( - db: &mut Db, - meta: Metadata, - msg: Vec, - sig: SchnorrSignature, - ) { - { - let from = (*KEYS).read().unwrap()[&meta.from]; - assert!( - sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R)) - ); - } - // Assert one, and only one of these, is the coordinator - assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator)); + // Assert one, and only one of these, is the coordinator + assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator)); - // Verify (from, to, intent) hasn't been prior seen - fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec { - [&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat() - } - fn intent_key(from: Service, to: Service, intent: &[u8]) -> Vec { - key(b"intent_seen", borsh::to_vec(&(from, to, intent)).unwrap()) - } - let mut txn = db.txn(); - let intent_key = intent_key(meta.from, meta.to, &meta.intent); - if Get::get(&txn, &intent_key).is_some() { - log::warn!( - "Prior queued message attempted to be queued again. From: {:?} To: {:?} Intent: {}", - meta.from, - meta.to, - hex::encode(&meta.intent) - ); - return; - } - DbTxn::put(&mut txn, intent_key, []); - - // Queue it - let id = (*QUEUES).read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message( - &mut txn, - QueuedMessage { - from: meta.from, - // Temporary value which queue_message will override - id: u64::MAX, - msg, - sig: sig.serialize(), - }, + // Verify (from, to, intent) hasn't been prior seen + fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec { + [&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat() + } + fn intent_key(from: Service, to: Service, intent: &[u8]) -> Vec { + key(b"intent_seen", borsh::to_vec(&(from, to, intent)).unwrap()) + } + let mut txn = db.txn(); + let intent_key = intent_key(meta.from, meta.to, &meta.intent); + if Get::get(&txn, &intent_key).is_some() { + log::warn!( + "Prior queued message attempted to be queued again. From: {:?} To: {:?} Intent: {}", + meta.from, + meta.to, + hex::encode(&meta.intent) ); - - log::info!("Queued message. From: {:?} To: {:?} ID: {id}", meta.from, meta.to); - DbTxn::commit(txn); + return; } + DbTxn::put(&mut txn, intent_key, []); + + // Queue it + let id = (*QUEUES).read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message( + &mut txn, + QueuedMessage { + from: meta.from, + // Temporary value which queue_message will override + id: u64::MAX, + msg, + sig: sig.serialize(), + }, + ); + + log::info!("Queued message. From: {:?} To: {:?} ID: {id}", meta.from, meta.to); + DbTxn::commit(txn); +} - // next RPC method - /* - Gets the next message in queue for the named services. - - This is not authenticated due to the fact every nonce would have to be saved to prevent - replays, or a challenge-response protocol implemented. Neither are worth doing when there - should be no sensitive data on this server. - */ - pub(crate) fn get_next_message(from: Service, to: Service) -> Option { - let queue_outer = (*QUEUES).read().unwrap(); - let queue = queue_outer[&(from, to)].read().unwrap(); - let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0); - queue.get_message(next) - } +// next RPC method +/* + Gets the next message in queue for the named services. + + This is not authenticated due to the fact every nonce would have to be saved to prevent + replays, or a challenge-response protocol implemented. Neither are worth doing when there + should be no sensitive data on this server. +*/ +pub(crate) fn get_next_message(from: Service, to: Service) -> Option { + let queue_outer = (*QUEUES).read().unwrap(); + let queue = queue_outer[&(from, to)].read().unwrap(); + let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0); + queue.get_message(next) +} - // ack RPC method - /* - Acknowledges a message as received and handled, meaning it'll no longer be returned as the next - message. - */ - pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature) { - { - let to_key = (*KEYS).read().unwrap()[&to]; - assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R))); - } +// ack RPC method +/* + Acknowledges a message as received and handled, meaning it'll no longer be returned as the next + message. +*/ +pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature) { + { + let to_key = (*KEYS).read().unwrap()[&to]; + assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R))); + } - // Is it: - // The acknowledged message should be > last acknowledged OR - // The acknowledged message should be >= - // It's the first if we save messages as acknowledged before acknowledging them - // It's the second if we acknowledge messages before saving them as acknowledged - // TODO: Check only a proper message is being acked + // Is it: + // The acknowledged message should be > last acknowledged OR + // The acknowledged message should be >= + // It's the first if we save messages as acknowledged before acknowledging them + // It's the second if we acknowledge messages before saving them as acknowledged + // TODO: Check only a proper message is being acked - log::info!("Acknowledging From: {:?} To: {:?} ID: {}", from, to, id); + log::info!("Acknowledging From: {:?} To: {:?} ID: {}", from, to, id); - (*QUEUES).read().unwrap()[&(from, to)].write().unwrap().ack_message(id) - } + (*QUEUES).read().unwrap()[&(from, to)].write().unwrap().ack_message(id) } -#[cfg(feature = "binaries")] #[tokio::main(flavor = "current_thread")] async fn main() { - use binaries::*; - // Override the panic handler with one which will panic if any tokio task panics { let existing = std::panic::take_hook(); @@ -285,8 +277,3 @@ async fn main() { }); } } - -#[cfg(not(feature = "binaries"))] -fn main() { - panic!("To run binaries, please build with `--feature binaries`."); -}