Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent messages from taking too long to be sent #902

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
67 changes: 37 additions & 30 deletions chain-signatures/node/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::time::{Duration, Instant};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

use near_account_id::AccountId;

#[derive(Debug, Clone, clap::Parser)]
#[group(id = "message_options")]
pub struct Options {
Expand Down Expand Up @@ -46,7 +48,7 @@ pub enum SendError {

async fn send_encrypted<U: IntoUrl>(
from: Participant,
client: &Client,
client: Client,
url: U,
message: Vec<Ciphered>,
request_timeout: Duration,
Expand Down Expand Up @@ -97,14 +99,16 @@ async fn send_encrypted<U: IntoUrl>(
pub struct MessageQueue {
deque: VecDeque<(ParticipantInfo, MpcMessage, Instant)>,
seen_counts: HashSet<String>,
account_id: AccountId,
message_options: Options,
}

impl MessageQueue {
pub fn new(options: Options) -> Self {
pub fn new(id: &AccountId, options: Options) -> Self {
Self {
deque: VecDeque::default(),
seen_counts: HashSet::default(),
deque: VecDeque::new(),
seen_counts: HashSet::new(),
account_id: id.clone(),
message_options: options,
}
}
Expand Down Expand Up @@ -162,54 +166,57 @@ impl MessageQueue {
encrypted.push((encrypted_msg, (info, msg, instant)));
}

let mut compacted = 0;
let mut tasks = tokio::task::JoinSet::new();
for (id, encrypted) in encrypted {
for partition in partition_ciphered_256kb(encrypted) {
let (encrypted_partition, msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip();
let (encrypted_partition, _msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip();
// guaranteed to unwrap due to our previous loop check:
let info = participants.get(&Participant::from(id)).unwrap();
let id = Participant::from(id);
let info = participants.get(&id).unwrap();
let account_id = &info.account_id;

let start = Instant::now();
crate::metrics::NUM_SEND_ENCRYPTED_TOTAL
.with_label_values(&[account_id.as_str()])
.inc();
if let Err(err) = send_encrypted(

tasks.spawn(send_encrypted(
from,
client,
&info.url,
client.clone(),
info.url.clone(),
encrypted_partition,
Duration::from_millis(self.message_options.timeout),
)
.await
{
crate::metrics::NUM_SEND_ENCRYPTED_FAILURE
ChaoticTempest marked this conversation as resolved.
Show resolved Hide resolved
.with_label_values(&[account_id.as_str()])
.inc();
crate::metrics::FAILED_SEND_ENCRYPTED_LATENCY
.with_label_values(&[account_id.as_str()])
.observe(start.elapsed().as_millis() as f64);

// since we failed, put back all the messages related to this
failed.extend(msgs);
));
}
}

let mut compacted = 0;
while let Some(result) = tasks.join_next().await {
match result {
Ok(result) => {
let Err(err) = result else {
compacted += 1;
continue;
};
errors.push(err);
} else {
compacted += msgs.len();
crate::metrics::SEND_ENCRYPTED_LATENCY
.with_label_values(&[account_id.as_str()])
.observe(start.elapsed().as_millis() as f64);
}
Err(err) => {
tracing::error!(?err, "message queue task failure");
}
}
}

let elapsed = outer.elapsed();
if uncompacted > 0 {
tracing::info!(
uncompacted,
compacted,
"{from:?} sent messages in {:?};",
outer.elapsed()
"{from:?} sent messages in {elapsed:?}",
);
}
crate::metrics::SEND_ENCRYPTED_LATENCY
.with_label_values(&[self.account_id.as_str()])
.observe(elapsed.as_millis() as f64);

// only add the participant count if it hasn't been seen before.
let counts = format!("{participant_counter:?}");
if !participant_counter.is_empty() && self.seen_counts.insert(counts.clone()) {
Expand Down
19 changes: 0 additions & 19 deletions chain-signatures/node/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,6 @@ pub(crate) static PROTOCOL_LATENCY_ITER_MESSAGE: Lazy<HistogramVec> = Lazy::new(
.unwrap()
});

pub(crate) static NUM_SEND_ENCRYPTED_FAILURE: Lazy<CounterVec> = Lazy::new(|| {
try_create_counter_vec(
"multichain_send_encrypted_failure",
"number of successful send encrypted",
&["node_account_id"],
)
.unwrap()
});

pub(crate) static NUM_SEND_ENCRYPTED_TOTAL: Lazy<CounterVec> = Lazy::new(|| {
try_create_counter_vec(
"multichain_send_encrypted_total",
Expand All @@ -325,16 +316,6 @@ pub(crate) static NUM_SEND_ENCRYPTED_TOTAL: Lazy<CounterVec> = Lazy::new(|| {
.unwrap()
});

pub(crate) static FAILED_SEND_ENCRYPTED_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"multichain_failed_send_encrypted_ms",
"Latency of failed send encrypted.",
&["node_account_id"],
Some(exponential_buckets(0.5, 1.5, 20).unwrap()),
)
.unwrap()
});

pub(crate) static NUM_TOTAL_HISTORICAL_SIGNATURE_GENERATORS: Lazy<CounterVec> = Lazy::new(|| {
try_create_counter_vec(
"multichain_num_total_historical_signature_generators",
Expand Down
3 changes: 3 additions & 0 deletions chain-signatures/node/src/protocol/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl ConsensusProtocol for StartedState {
),
)),
messages: Arc::new(RwLock::new(MessageQueue::new(
ctx.my_account_id(),
ctx.message_options().clone(),
))),
}))
Expand Down Expand Up @@ -229,6 +230,7 @@ impl ConsensusProtocol for StartedState {
threshold: contract_state.threshold,
protocol,
messages: Arc::new(RwLock::new(MessageQueue::new(
ctx.my_account_id(),
ctx.message_options().clone(),
))),
}))
Expand Down Expand Up @@ -767,6 +769,7 @@ async fn start_resharing<C: ConsensusCtx>(
public_key: contract_state.public_key,
protocol,
messages: Arc::new(RwLock::new(MessageQueue::new(
ctx.my_account_id(),
ctx.message_options().clone(),
))),
}))
Expand Down
7 changes: 5 additions & 2 deletions chain-signatures/node/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,17 @@ impl MpcSignProtocol {
crate::metrics::PROTOCOL_ITER_CNT
.with_label_values(&[my_account_id.as_str()])
.inc();

let msg_time = Instant::now();
let mut msg_count = 0;
loop {
let msg_result = self.receiver.try_recv();
match msg_result {
Ok(msg) => {
tracing::debug!("received a new message");
msg_count += 1;
ChaoticTempest marked this conversation as resolved.
Show resolved Hide resolved
queue.push(msg);
}
Err(TryRecvError::Empty) => {
tracing::debug!("no new messages received");
break;
}
Err(TryRecvError::Disconnected) => {
Expand All @@ -266,6 +268,7 @@ impl MpcSignProtocol {
}
}
}
tracing::debug!("received {msg_count} messages in {:?}", msg_time.elapsed());
ChaoticTempest marked this conversation as resolved.
Show resolved Hide resolved
ChaoticTempest marked this conversation as resolved.
Show resolved Hide resolved

let contract_state = if last_state_update.elapsed() > Duration::from_secs(1) {
let contract_state = match rpc_client::fetch_mpc_contract_state(
Expand Down
Loading