Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cleaning up message protocol for better consistency #686

Merged
merged 8 commits into from
Jul 18, 2024
340 changes: 196 additions & 144 deletions chain-signatures/node/src/protocol/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::cryptography::CryptographicError;
use super::presignature::{self, PresignatureId};
use super::presignature::{GenerationError, PresignatureId};
use super::state::{GeneratingState, NodeState, ResharingState, RunningState};
use super::triple::TripleId;
use crate::gcp::error::SecretStorageError;
Expand All @@ -17,7 +17,6 @@ use near_primitives::hash::CryptoHash;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;

#[async_trait::async_trait]
Expand Down Expand Up @@ -236,45 +235,24 @@ impl MessageHandler for RunningState {

// remove the triple_id that has already failed or taken from the triple_bins
// and refresh the timestamp of failed and taken
queue
.triple_bins
.entry(self.epoch)
.or_default()
.retain(|id, queue| {
if let Some(first_msg) = queue.front() {
// Skip this triple if its message already timed out
if util::is_elapsed_longer_than_timeout(
first_msg.timestamp,
let triple_messages = queue.triple_bins.entry(self.epoch).or_default();
triple_messages.retain(|id, queue| {
if queue.is_empty()
|| queue.iter().any(|msg| {
util::is_elapsed_longer_than_timeout(
msg.timestamp,
crate::types::PROTOCOL_TRIPLE_TIMEOUT,
) {
return false;
}
} else {
return false;
}
let has_failed = triple_manager.failed_triples.contains_key(id);
if has_failed {
triple_manager.failed_triples.insert(*id, Instant::now());
}
let is_taken = triple_manager.taken.contains_key(id);
if is_taken {
triple_manager.taken.insert(*id, Instant::now());
}
!has_failed && !is_taken
});

for (id, queue) in queue.triple_bins.entry(self.epoch).or_default() {
if let Some(first_msg) = queue.front() {
// Skip this triple if its message already timed out
if util::is_elapsed_longer_than_timeout(
first_msg.timestamp,
crate::types::PROTOCOL_TRIPLE_TIMEOUT,
) {
continue;
}
} else {
continue;
)
})
{
return false;
}

// if triple id is in GC, remove these messages because the triple is currently
// being GC'ed, where this particular triple has previously failed or been utilized.
!triple_manager.refresh_gc(id)
});
for (id, queue) in triple_messages {
let protocol = match triple_manager.get_or_generate(*id, participants) {
Ok(protocol) => protocol,
Err(err) => {
Expand All @@ -293,126 +271,200 @@ impl MessageHandler for RunningState {
}

let mut presignature_manager = self.presignature_manager.write().await;
for (id, queue) in queue.presignature_bins.entry(self.epoch).or_default() {
let mut leftover_messages = Vec::new();
while let Some(message) = queue.pop_front() {
// Skip message if it already timed out
if util::is_elapsed_longer_than_timeout(
message.timestamp,
crate::types::PROTOCOL_PRESIG_TIMEOUT,
) {
let presignature_messages = queue.presignature_bins.entry(self.epoch).or_default();
presignature_messages.retain(|id, queue| {
// Skip message if it already timed out
if queue.is_empty()
|| queue.iter().any(|msg| {
util::is_elapsed_longer_than_timeout(
msg.timestamp,
crate::types::PROTOCOL_PRESIG_TIMEOUT,
)
})
{
return false;
}

// if presignature id is in GC, remove these messages because the presignature is currently
// being GC'ed, where this particular presignature has previously failed or been utilized.
!presignature_manager.refresh_gc(id)
});
for (id, queue) in presignature_messages {
// SAFETY: this unwrap() is safe since we have already checked that the queue is not empty.
let PresignatureMessage {
triple0, triple1, ..
} = queue.front().unwrap();

if !queue
.iter()
.all(|msg| triple0 == &msg.triple0 && triple1 == &msg.triple1)
{
// Check that all messages in the queue have the same triple0 and triple1, otherwise this is an
// invalid message, so we should just bin the whole entire protocol and its message for this presignature id.
queue.clear();
continue;
}

let protocol = match presignature_manager
.get_or_generate(
participants,
*id,
*triple0,
*triple1,
&mut triple_manager,
&self.public_key,
&self.private_share,
)
.await
{
Ok(protocol) => protocol,
Err(GenerationError::TripleIsGenerating(_)) => {
// We will go back to this presignature bin later when the triple is generated.
continue;
}

match presignature_manager
.get_or_generate(
participants,
*id,
message.triple0,
message.triple1,
&mut triple_manager,
&self.public_key,
&self.private_share,
)
.await
{
Ok(protocol) => protocol.message(message.from, message.data),
Err(presignature::GenerationError::AlreadyGenerated) => {
tracing::debug!(id, "presignature already generated, nothing left to do")
}
Err(presignature::GenerationError::TripleIsGenerating(_)) => {
// Store the message until triple gets generated
leftover_messages.push(message)
}
Err(presignature::GenerationError::TripleIsMissing(_)) => {
// If a triple is missing, that means our system cannot process this presignature. We will have to bin
// this message and have the other node timeout on that generation.
tracing::warn!(
presignature_id = id,
triple0 = message.triple0,
triple1 = message.triple1,
"unable to process presignature: one or more triples are missing",
);
}
Err(presignature::GenerationError::CaitSithInitializationError(error)) => {
// ignore the message since the generation had bad parameters. Also have the other node who
// initiated the protocol resend the message or have it timeout on their side.
tracing::warn!(
presignature_id = id,
?error,
"unable to initialize incoming presignature protocol"
);
continue;
}
Err(
err @ (GenerationError::AlreadyGenerated
| GenerationError::TripleIsGarbageCollected(_)
| GenerationError::TripleIsMissing(_)),
) => {
// This triple has already been generated or removed from the triple manager, so we will have to bin
// the entirety of the messages we received for this presignature id, and have the other nodes timeout
tracing::debug!(id, ?err, "presignature cannot be generated");
queue.clear();
continue;
}
}
if !leftover_messages.is_empty() {
tracing::warn!(
msg_count = leftover_messages.len(),
"unable to process messages, storing for future"
);
queue.extend(leftover_messages);
Err(GenerationError::CaitSithInitializationError(error)) => {
// ignore these messages since the generation had bad parameters. Also have the other node who
// initiated the protocol resend the message or have it timeout on their side.
tracing::warn!(
presignature_id = id,
?error,
"unable to initialize incoming presignature protocol"
);
queue.clear();
continue;
}
Err(err) => {
tracing::warn!(
presignature_id = id,
?err,
"Unexpected error encounted while generating presignature"
);
queue.clear();
continue;
}
};

while let Some(message) = queue.pop_front() {
protocol.message(message.from, message.data);
}
}

let mut signature_manager = self.signature_manager.write().await;
for (receipt_id, queue) in queue.signature_bins.entry(self.epoch).or_default() {
let mut leftover_messages = Vec::new();
while let Some(message) = queue.pop_front() {
// Skip message if it already timed out
if util::is_elapsed_longer_than_timeout(
message.timestamp,
crate::types::PROTOCOL_SIGNATURE_TIMEOUT,
) {
let signature_messages = queue.signature_bins.entry(self.epoch).or_default();
signature_messages.retain(|_, queue| {
// Skip message if it already timed out
if queue.is_empty()
|| queue.iter().any(|msg| {
util::is_elapsed_longer_than_timeout(
msg.timestamp,
crate::types::PROTOCOL_SIGNATURE_TIMEOUT,
)
})
{
return false;
}
true
});
for (receipt_id, queue) in signature_messages {
// SAFETY: this unwrap() is safe since we have already checked that the queue is not empty.
let SignatureMessage {
proposer,
presignature_id,
request,
epsilon,
delta,
..
} = queue.front().unwrap();

if !queue
.iter()
.all(|msg| presignature_id == &msg.presignature_id)
{
// Check that all messages in the queue have the same triple0 and triple1, otherwise this is an
// invalid message, so we should just bin the whole entire protocol and its message for this presignature id.
queue.clear();
continue;
}

// if !self
// .sign_queue
// .read()
// .await
// .contains(message.proposer, receipt_id.clone())
// {
// leftover_messages.push(message);
// continue;
// };
// TODO: Validate that the message matches our sign_queue
let protocol = match signature_manager.get_or_generate(
participants,
*receipt_id,
*proposer,
*presignature_id,
request,
*epsilon,
*delta,
&mut presignature_manager,
) {
Ok(protocol) => protocol,
Err(GenerationError::PresignatureIsGenerating(_)) => {
// We will revisit this this signature request later when the presignature has been generated.
continue;
}

// TODO: make consistent with presignature manager AlreadyGenerated.
if signature_manager.has_completed(&message.presignature_id) {
tracing::info!(
presignature_id = message.presignature_id,
"signature already generated, nothing left to do"
Err(
err @ (GenerationError::AlreadyGenerated
| GenerationError::PresignatureIsGarbageCollected(_)
| GenerationError::PresignatureIsMissing(_)),
) => {
// We will have to remove the entirety of the messages we received for this signature request,
// and have the other nodes timeout in the following cases:
// - If a presignature is in GC, then it was used already or failed to be produced.
// - If a presignature is missing, that means our system cannot process this signature.
tracing::debug!(%receipt_id, ?err, "signature cannot be generated");
queue.clear();
continue;
}
Err(GenerationError::CaitSithInitializationError(error)) => {
// ignore the whole of the messages since the generation had bad parameters. Also have the other node who
// initiated the protocol resend the message or have it timeout on their side.
tracing::warn!(
?receipt_id,
presignature_id,
?error,
"unable to initialize incoming signature protocol"
);
queue.clear();
continue;
}
// if !self
// .sign_queue
// .read()
// .await
// .contains(message.proposer, receipt_id.clone())
// {
// leftover_messages.push(message);
// continue;
// };
// TODO: Validate that the message matches our sign_queue
match signature_manager.get_or_generate(
participants,
*receipt_id,
message.proposer,
message.presignature_id,
message.request.clone(),
message.epsilon,
message.delta,
&mut presignature_manager,
)? {
Some(protocol) => protocol.message(message.from, message.data),
None => {
// Store the message until we are ready to process it
leftover_messages.push(message)
}
Err(err) => {
tracing::warn!(
?receipt_id,
?err,
"Unexpected error encounted while generating signature"
);
queue.clear();
continue;
}
}
if !leftover_messages.is_empty() {
tracing::warn!(
msg_count = leftover_messages.len(),
"unable to process messages, storing for future"
);
queue.extend(leftover_messages);
};

while let Some(message) = queue.pop_front() {
protocol.message(message.from, message.data);
}
}
triple_manager.clear_failed_triples();
triple_manager.clear_taken();
presignature_manager.clear_taken();
triple_manager.garbage_collect();
presignature_manager.garbage_collect();
signature_manager.garbage_collect();
Ok(())
}
}
Expand Down
Loading
Loading