Skip to content

Commit

Permalink
Correct removal of binaries feature from message-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
kayabaNerve committed Nov 29, 2023
1 parent 8378a10 commit 83a045d
Showing 1 changed file with 121 additions and 134 deletions.
255 changes: 121 additions & 134 deletions message-queue/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,157 +1,149 @@
#[cfg(feature = "binaries")]
mod messages;
#[cfg(feature = "binaries")]
mod queue;

#[cfg(feature = "binaries")]
mod binaries {
pub(crate) use std::{
sync::{Arc, RwLock},
collections::HashMap,
};
pub(crate) use std::{
sync::{Arc, RwLock},
collections::HashMap,
};

pub(crate) use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
pub(crate) use schnorr_signatures::SchnorrSignature;
pub(crate) use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
pub(crate) use schnorr_signatures::SchnorrSignature;

pub(crate) use serai_primitives::NetworkId;
pub(crate) use serai_primitives::NetworkId;

pub(crate) use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
};
pub(crate) use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
};

use serai_db::{Get, DbTxn, Db as DbTrait};
use serai_db::{Get, DbTxn, Db as DbTrait};

pub(crate) use crate::messages::*;
pub(crate) use crate::messages::*;

pub(crate) use crate::queue::Queue;
pub(crate) use crate::queue::Queue;

#[cfg(all(feature = "redb", not(feature = "rocksdb")))]
pub(crate) type Db = Arc<serai_db::Redb>;
#[cfg(feature = "rocksdb")]
pub(crate) type Db = serai_db::RocksDB;
#[cfg(all(feature = "redb", not(feature = "rocksdb")))]
pub(crate) type Db = Arc<serai_db::Redb>;
#[cfg(feature = "rocksdb")]
pub(crate) type Db = serai_db::RocksDB;

#[allow(clippy::type_complexity)]
mod clippy {
use super::*;
use once_cell::sync::Lazy;
pub(crate) static KEYS: Lazy<Arc<RwLock<HashMap<Service, <Ristretto as Ciphersuite>::G>>>> =
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
pub(crate) static QUEUES: Lazy<Arc<RwLock<HashMap<(Service, Service), RwLock<Queue<Db>>>>>> =
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
#[allow(clippy::type_complexity)]
mod clippy {
use super::*;
use once_cell::sync::Lazy;
pub(crate) static KEYS: Lazy<Arc<RwLock<HashMap<Service, <Ristretto as Ciphersuite>::G>>>> =
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
pub(crate) static QUEUES: Lazy<Arc<RwLock<HashMap<(Service, Service), RwLock<Queue<Db>>>>>> =
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
}
pub(crate) use self::clippy::*;

// queue RPC method
/*
Queues a message to be delivered from a processor to a coordinator, or vice versa.
Messages are authenticated to be coming from the claimed service. Recipient services SHOULD
independently verify signatures.
The metadata specifies an intent. Only one message, for a specified intent, will be delivered.
This allows services to safely send messages multiple times without them being delivered
multiple times.
The message will be ordered by this service, with the order having no guarantees other than
successful ordering by the time this call returns.
*/
pub(crate) fn queue_message(
db: &mut Db,
meta: Metadata,
msg: Vec<u8>,
sig: SchnorrSignature<Ristretto>,
) {
{
let from = (*KEYS).read().unwrap()[&meta.from];
assert!(
sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R))
);
}
pub(crate) use self::clippy::*;

// queue RPC method
/*
Queues a message to be delivered from a processor to a coordinator, or vice versa.
Messages are authenticated to be coming from the claimed service. Recipient services SHOULD
independently verify signatures.
The metadata specifies an intent. Only one message, for a specified intent, will be delivered.
This allows services to safely send messages multiple times without them being delivered
multiple times.
The message will be ordered by this service, with the order having no guarantees other than
successful ordering by the time this call returns.
*/
pub(crate) fn queue_message(
db: &mut Db,
meta: Metadata,
msg: Vec<u8>,
sig: SchnorrSignature<Ristretto>,
) {
{
let from = (*KEYS).read().unwrap()[&meta.from];
assert!(
sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R))
);
}

// Assert one, and only one of these, is the coordinator
assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator));
// Assert one, and only one of these, is the coordinator
assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator));

// Verify (from, to, intent) hasn't been prior seen
fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
[&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat()
}
fn intent_key(from: Service, to: Service, intent: &[u8]) -> Vec<u8> {
key(b"intent_seen", borsh::to_vec(&(from, to, intent)).unwrap())
}
let mut txn = db.txn();
let intent_key = intent_key(meta.from, meta.to, &meta.intent);
if Get::get(&txn, &intent_key).is_some() {
log::warn!(
"Prior queued message attempted to be queued again. From: {:?} To: {:?} Intent: {}",
meta.from,
meta.to,
hex::encode(&meta.intent)
);
return;
}
DbTxn::put(&mut txn, intent_key, []);

// Queue it
let id = (*QUEUES).read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message(
&mut txn,
QueuedMessage {
from: meta.from,
// Temporary value which queue_message will override
id: u64::MAX,
msg,
sig: sig.serialize(),
},
// Verify (from, to, intent) hasn't been prior seen
fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
[&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat()
}
fn intent_key(from: Service, to: Service, intent: &[u8]) -> Vec<u8> {
key(b"intent_seen", borsh::to_vec(&(from, to, intent)).unwrap())
}
let mut txn = db.txn();
let intent_key = intent_key(meta.from, meta.to, &meta.intent);
if Get::get(&txn, &intent_key).is_some() {
log::warn!(
"Prior queued message attempted to be queued again. From: {:?} To: {:?} Intent: {}",
meta.from,
meta.to,
hex::encode(&meta.intent)
);

log::info!("Queued message. From: {:?} To: {:?} ID: {id}", meta.from, meta.to);
DbTxn::commit(txn);
return;
}
DbTxn::put(&mut txn, intent_key, []);

// Queue it
let id = (*QUEUES).read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message(
&mut txn,
QueuedMessage {
from: meta.from,
// Temporary value which queue_message will override
id: u64::MAX,
msg,
sig: sig.serialize(),
},
);

log::info!("Queued message. From: {:?} To: {:?} ID: {id}", meta.from, meta.to);
DbTxn::commit(txn);
}

// next RPC method
/*
Gets the next message in queue for the named services.
This is not authenticated due to the fact every nonce would have to be saved to prevent
replays, or a challenge-response protocol implemented. Neither are worth doing when there
should be no sensitive data on this server.
*/
pub(crate) fn get_next_message(from: Service, to: Service) -> Option<QueuedMessage> {
let queue_outer = (*QUEUES).read().unwrap();
let queue = queue_outer[&(from, to)].read().unwrap();
let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0);
queue.get_message(next)
}
// next RPC method
/*
Gets the next message in queue for the named services.
This is not authenticated due to the fact every nonce would have to be saved to prevent
replays, or a challenge-response protocol implemented. Neither are worth doing when there
should be no sensitive data on this server.
*/
pub(crate) fn get_next_message(from: Service, to: Service) -> Option<QueuedMessage> {
let queue_outer = (*QUEUES).read().unwrap();
let queue = queue_outer[&(from, to)].read().unwrap();
let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0);
queue.get_message(next)
}

// ack RPC method
/*
Acknowledges a message as received and handled, meaning it'll no longer be returned as the next
message.
*/
pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
{
let to_key = (*KEYS).read().unwrap()[&to];
assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R)));
}
// ack RPC method
/*
Acknowledges a message as received and handled, meaning it'll no longer be returned as the next
message.
*/
pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
{
let to_key = (*KEYS).read().unwrap()[&to];
assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R)));
}

// Is it:
// The acknowledged message should be > last acknowledged OR
// The acknowledged message should be >=
// It's the first if we save messages as acknowledged before acknowledging them
// It's the second if we acknowledge messages before saving them as acknowledged
// TODO: Check only a proper message is being acked
// Is it:
// The acknowledged message should be > last acknowledged OR
// The acknowledged message should be >=
// It's the first if we save messages as acknowledged before acknowledging them
// It's the second if we acknowledge messages before saving them as acknowledged
// TODO: Check only a proper message is being acked

log::info!("Acknowledging From: {:?} To: {:?} ID: {}", from, to, id);
log::info!("Acknowledging From: {:?} To: {:?} ID: {}", from, to, id);

(*QUEUES).read().unwrap()[&(from, to)].write().unwrap().ack_message(id)
}
(*QUEUES).read().unwrap()[&(from, to)].write().unwrap().ack_message(id)
}

#[cfg(feature = "binaries")]
#[tokio::main(flavor = "current_thread")]
async fn main() {
use binaries::*;

// Override the panic handler with one which will panic if any tokio task panics
{
let existing = std::panic::take_hook();
Expand Down Expand Up @@ -285,8 +277,3 @@ async fn main() {
});
}
}

#[cfg(not(feature = "binaries"))]
fn main() {
panic!("To run binaries, please build with `--feature binaries`.");
}

0 comments on commit 83a045d

Please sign in to comment.