diff --git a/Cargo.lock b/Cargo.lock index e60bcf814..07ade92c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7344,6 +7344,7 @@ dependencies = [ "sp-runtime", "tokio", "tributary-chain", + "zalloc", "zeroize", ] @@ -7486,6 +7487,7 @@ dependencies = [ "serai-env", "serai-primitives", "tokio", + "zalloc", "zeroize", ] @@ -7607,6 +7609,7 @@ dependencies = [ "sp-application-crypto", "thiserror", "tokio", + "zalloc", "zeroize", ] diff --git a/common/request/src/lib.rs b/common/request/src/lib.rs index 63fb7c8d5..2c56db945 100644 --- a/common/request/src/lib.rs +++ b/common/request/src/lib.rs @@ -49,11 +49,14 @@ pub struct Client { impl Client { fn connector() -> Connector { + let mut res = HttpConnector::new(); + res.set_keepalive(Some(core::time::Duration::from_secs(60))); #[cfg(feature = "tls")] - let res = - HttpsConnectorBuilder::new().with_native_roots().https_or_http().enable_http1().build(); - #[cfg(not(feature = "tls"))] - let res = HttpConnector::new(); + let res = HttpsConnectorBuilder::new() + .with_native_roots() + .https_or_http() + .enable_http1() + .wrap_connector(res); res } diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index c049360b0..12f8e763f 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -32,6 +32,7 @@ frost-schnorrkel = { path = "../crypto/schnorrkel" } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] } +zalloc = { path = "../common/zalloc" } serai-db = { path = "../common/db" } serai-env = { path = "../common/env" } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index b3599024d..d0f387411 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -63,6 +63,10 @@ use cosign_evaluator::CosignEvaluator; #[cfg(test)] pub mod tests; +#[global_allocator] +static ALLOCATOR: zalloc::ZeroizingAlloc = + zalloc::ZeroizingAlloc(std::alloc::System); + #[derive(Clone)] pub struct ActiveTributary { pub spec: TributarySpec, diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index 7f9de7b2b..27fef1f0e 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -141,12 +141,16 @@ impl ReattemptDb { // 5 minutes for attempts 0 ..= 2, 10 minutes for attempts 3 ..= 5, 15 minutes for attempts > 5 // Assumes no event will take longer than 15 minutes, yet grows the time in case there are // network bandwidth issues - let reattempt_delay = BASE_REATTEMPT_DELAY * + let mut reattempt_delay = BASE_REATTEMPT_DELAY * ((AttemptDb::attempt(txn, genesis, topic) .expect("scheduling re-attempt for unknown topic") / 3) + 1) .min(3); + // Allow more time for DKGs since they have an extra round and much more data + if matches!(topic, Topic::Dkg) { + reattempt_delay *= 4; + } let upon_block = current_block_number + reattempt_delay; let mut reattempts = Self::get(txn, genesis, upon_block).unwrap_or(vec![]); diff --git a/message-queue/Cargo.toml b/message-queue/Cargo.toml index 959701635..9eeaa5ce2 100644 --- a/message-queue/Cargo.toml +++ b/message-queue/Cargo.toml @@ -40,6 +40,7 @@ env_logger = { version = "0.10", default-features = false, features = ["humantim # Uses a single threaded runtime since this shouldn't ever be CPU-bound tokio = { version = "1", default-features = false, features = ["rt", "time", "io-util", "net", "macros"] } +zalloc = { path = "../common/zalloc" } serai-db = { path = "../common/db", optional = true } serai-env = { path = "../common/env" } diff --git a/message-queue/src/client.rs b/message-queue/src/client.rs index e1068082a..3aaf5a24e 100644 --- a/message-queue/src/client.rs +++ b/message-queue/src/client.rs @@ -68,9 +68,13 @@ impl MessageQueue { async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool { let msg = borsh::to_vec(&msg).unwrap(); let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else { + log::warn!("couldn't send the message len"); + return false; + }; + let Ok(()) = socket.write_all(&msg).await else { + log::warn!("couldn't write the message"); return false; }; - let Ok(()) = socket.write_all(&msg).await else { return false }; true } @@ -118,20 +122,32 @@ impl MessageQueue { 'outer: loop { if !first { tokio::time::sleep(core::time::Duration::from_secs(5)).await; - continue; } first = false; - let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue }; + log::trace!("opening socket to message-queue for next"); + let mut socket = match TcpStream::connect(&self.url).await { + Ok(socket) => socket, + Err(e) => { + log::warn!("couldn't connect to message-queue server: {e:?}"); + continue; + } + }; + log::trace!("opened socket for next"); loop { if !Self::send(&mut socket, msg.clone()).await { continue 'outer; } - let Ok(status) = socket.read_u8().await else { - continue 'outer; + let status = match socket.read_u8().await { + Ok(status) => status, + Err(e) => { + log::warn!("couldn't read status u8: {e:?}"); + continue 'outer; + } }; // If there wasn't a message, check again in 1s + // TODO: Use a notification system here if status == 0 { tokio::time::sleep(core::time::Duration::from_secs(1)).await; continue; @@ -143,12 +159,17 @@ impl MessageQueue { // Timeout after 5 seconds in case there's an issue with the length handling let Ok(msg) = tokio::time::timeout(core::time::Duration::from_secs(5), async { // Read the message length - let Ok(len) = socket.read_u32_le().await else { - return vec![]; + let len = match socket.read_u32_le().await { + Ok(len) => len, + Err(e) => { + log::warn!("couldn't read len: {e:?}"); + return vec![]; + } }; let mut buf = vec![0; usize::try_from(len).unwrap()]; // Read the message let Ok(_) = socket.read_exact(&mut buf).await else { + log::warn!("couldn't read the message"); return vec![]; }; buf diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs index 754528af3..c43cc3c84 100644 --- a/message-queue/src/main.rs +++ b/message-queue/src/main.rs @@ -1,6 +1,3 @@ -mod messages; -mod queue; - pub(crate) use std::{ sync::{Arc, RwLock}, collections::HashMap, @@ -38,6 +35,13 @@ mod clippy { } pub(crate) use self::clippy::*; +mod messages; +mod queue; + +#[global_allocator] +static ALLOCATOR: zalloc::ZeroizingAlloc = + zalloc::ZeroizingAlloc(std::alloc::System); + // queue RPC method /* Queues a message to be delivered from a processor to a coordinator, or vice versa. diff --git a/processor/Cargo.toml b/processor/Cargo.toml index e3c284770..a213b983c 100644 --- a/processor/Cargo.toml +++ b/processor/Cargo.toml @@ -54,6 +54,7 @@ log = { version = "0.4", default-features = false, features = ["std"] } env_logger = { version = "0.10", default-features = false, features = ["humantime"], optional = true } tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] } +zalloc = { path = "../common/zalloc" } serai-db = { path = "../common/db", optional = true } serai-env = { path = "../common/env", optional = true } # TODO: Replace with direct usage of primitives diff --git a/processor/src/main.rs b/processor/src/main.rs index 7eb0fcd70..8aa7ef196 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -61,6 +61,10 @@ use multisigs::{MultisigEvent, MultisigManager}; #[cfg(test)] mod tests; +#[global_allocator] +static ALLOCATOR: zalloc::ZeroizingAlloc = + zalloc::ZeroizingAlloc(std::alloc::System); + // Items which are mutably borrowed by Tributary. // Any exceptions to this have to be carefully monitored in order to ensure consistency isn't // violated. @@ -559,6 +563,8 @@ async fn run(mut raw_db: D, network: N, mut loop { let mut txn = raw_db.txn(); + log::trace!("new db txn in run"); + let mut outer_msg = None; tokio::select! {