Skip to content

Commit

Permalink
PR to track down CI failures (#501)
Browse files Browse the repository at this point in the history
* Use an extended timeout for DKGs specifically

* Add a log statement when message-queue connection fails

* Add a 60 second keep-alive to connections

* Use zalloc for processor/message-queue/coordinator

An additional layer which protects us against edge cases with Zeroizing
(objects which don't support it or don't miss it).

* Add further logs to message-queue

* Further increase re-attempt timeouts in CI

* Remove misplaced continue inmessage-queue client

Fixes observed CI failures.

* Revert "Further increase re-attempt timeouts in CI"

This reverts commit 3723530.
  • Loading branch information
kayabaNerve authored Jan 4, 2024
1 parent 6c8040f commit 7eb388e
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 15 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions common/request/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ pub struct Client {

impl Client {
fn connector() -> Connector {
let mut res = HttpConnector::new();
res.set_keepalive(Some(core::time::Duration::from_secs(60)));
#[cfg(feature = "tls")]
let res =
HttpsConnectorBuilder::new().with_native_roots().https_or_http().enable_http1().build();
#[cfg(not(feature = "tls"))]
let res = HttpConnector::new();
let res = HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.wrap_connector(res);
res
}

Expand Down
1 change: 1 addition & 0 deletions coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ frost-schnorrkel = { path = "../crypto/schnorrkel" }

scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] }

zalloc = { path = "../common/zalloc" }
serai-db = { path = "../common/db" }
serai-env = { path = "../common/env" }

Expand Down
4 changes: 4 additions & 0 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ use cosign_evaluator::CosignEvaluator;
#[cfg(test)]
pub mod tests;

#[global_allocator]
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);

#[derive(Clone)]
pub struct ActiveTributary<D: Db, P: P2p> {
pub spec: TributarySpec,
Expand Down
6 changes: 5 additions & 1 deletion coordinator/src/tributary/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,16 @@ impl ReattemptDb {
// 5 minutes for attempts 0 ..= 2, 10 minutes for attempts 3 ..= 5, 15 minutes for attempts > 5
// Assumes no event will take longer than 15 minutes, yet grows the time in case there are
// network bandwidth issues
let reattempt_delay = BASE_REATTEMPT_DELAY *
let mut reattempt_delay = BASE_REATTEMPT_DELAY *
((AttemptDb::attempt(txn, genesis, topic)
.expect("scheduling re-attempt for unknown topic") /
3) +
1)
.min(3);
// Allow more time for DKGs since they have an extra round and much more data
if matches!(topic, Topic::Dkg) {
reattempt_delay *= 4;
}
let upon_block = current_block_number + reattempt_delay;

let mut reattempts = Self::get(txn, genesis, upon_block).unwrap_or(vec![]);
Expand Down
1 change: 1 addition & 0 deletions message-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ env_logger = { version = "0.10", default-features = false, features = ["humantim
# Uses a single threaded runtime since this shouldn't ever be CPU-bound
tokio = { version = "1", default-features = false, features = ["rt", "time", "io-util", "net", "macros"] }

zalloc = { path = "../common/zalloc" }
serai-db = { path = "../common/db", optional = true }

serai-env = { path = "../common/env" }
Expand Down
35 changes: 28 additions & 7 deletions message-queue/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,13 @@ impl MessageQueue {
async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool {
let msg = borsh::to_vec(&msg).unwrap();
let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else {
log::warn!("couldn't send the message len");
return false;
};
let Ok(()) = socket.write_all(&msg).await else {
log::warn!("couldn't write the message");
return false;
};
let Ok(()) = socket.write_all(&msg).await else { return false };
true
}

Expand Down Expand Up @@ -118,20 +122,32 @@ impl MessageQueue {
'outer: loop {
if !first {
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
continue;
}
first = false;

let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue };
log::trace!("opening socket to message-queue for next");
let mut socket = match TcpStream::connect(&self.url).await {
Ok(socket) => socket,
Err(e) => {
log::warn!("couldn't connect to message-queue server: {e:?}");
continue;
}
};
log::trace!("opened socket for next");

loop {
if !Self::send(&mut socket, msg.clone()).await {
continue 'outer;
}
let Ok(status) = socket.read_u8().await else {
continue 'outer;
let status = match socket.read_u8().await {
Ok(status) => status,
Err(e) => {
log::warn!("couldn't read status u8: {e:?}");
continue 'outer;
}
};
// If there wasn't a message, check again in 1s
// TODO: Use a notification system here
if status == 0 {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
continue;
Expand All @@ -143,12 +159,17 @@ impl MessageQueue {
// Timeout after 5 seconds in case there's an issue with the length handling
let Ok(msg) = tokio::time::timeout(core::time::Duration::from_secs(5), async {
// Read the message length
let Ok(len) = socket.read_u32_le().await else {
return vec![];
let len = match socket.read_u32_le().await {
Ok(len) => len,
Err(e) => {
log::warn!("couldn't read len: {e:?}");
return vec![];
}
};
let mut buf = vec![0; usize::try_from(len).unwrap()];
// Read the message
let Ok(_) = socket.read_exact(&mut buf).await else {
log::warn!("couldn't read the message");
return vec![];
};
buf
Expand Down
10 changes: 7 additions & 3 deletions message-queue/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
mod messages;
mod queue;

pub(crate) use std::{
sync::{Arc, RwLock},
collections::HashMap,
Expand Down Expand Up @@ -38,6 +35,13 @@ mod clippy {
}
pub(crate) use self::clippy::*;

mod messages;
mod queue;

#[global_allocator]
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);

// queue RPC method
/*
Queues a message to be delivered from a processor to a coordinator, or vice versa.
Expand Down
1 change: 1 addition & 0 deletions processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ log = { version = "0.4", default-features = false, features = ["std"] }
env_logger = { version = "0.10", default-features = false, features = ["humantime"], optional = true }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] }

zalloc = { path = "../common/zalloc" }
serai-db = { path = "../common/db", optional = true }
serai-env = { path = "../common/env", optional = true }
# TODO: Replace with direct usage of primitives
Expand Down
6 changes: 6 additions & 0 deletions processor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ use multisigs::{MultisigEvent, MultisigManager};
#[cfg(test)]
mod tests;

#[global_allocator]
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);

// Items which are mutably borrowed by Tributary.
// Any exceptions to this have to be carefully monitored in order to ensure consistency isn't
// violated.
Expand Down Expand Up @@ -559,6 +563,8 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
loop {
let mut txn = raw_db.txn();

log::trace!("new db txn in run");

let mut outer_msg = None;

tokio::select! {
Expand Down

0 comments on commit 7eb388e

Please sign in to comment.