Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make manager reject messages for already completed sigs #773

Merged
merged 3 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain-signatures/contract/src/config/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl Default for SignatureConfig {
fn default() -> Self {
Self {
generation_timeout: secs_to_ms(45),
garbage_timeout: hours_to_ms(24),

other: Default::default(),
}
Expand Down
36 changes: 4 additions & 32 deletions chain-signatures/contract/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ pub struct PresignatureConfig {
pub struct SignatureConfig {
/// Timeout for signature generation in milliseconds.
pub generation_timeout: u64,
/// Garbage collection timeout in milliseconds for signatures generated.
pub garbage_timeout: u64,

/// The remaining entries that can be present in future forms of the configuration.
#[serde(flatten)]
Expand All @@ -95,35 +97,6 @@ mod tests {

#[test]
fn test_load_config() {
let config_str: serde_json::Value = serde_json::from_str(
r#"{
"protocol": {
"message_timeout": 10000,
"garbage_timeout": 20000,
"max_concurrent_introduction": 10,
"max_concurrent_generation": 10,
"triple": {
"min_triples": 10,
"max_triples": 100,
"generation_timeout": 10000
},
"presignature": {
"min_presignatures": 10,
"max_presignatures": 100,
"generation_timeout": 10000
},
"signature": {
"generation_timeout": 10000
},
"string": "value",
"integer": 1000
},
"string": "value2",
"integer": 20
}"#,
)
.unwrap();

let config_macro = serde_json::json!({
"protocol": {
"message_timeout": 10000,
Expand All @@ -141,7 +114,8 @@ mod tests {
"generation_timeout": 10000
},
"signature": {
"generation_timeout": 10000
"generation_timeout": 10000,
"garbage_timeout": 10000000
},
"string": "value",
"integer": 1000
Expand All @@ -150,8 +124,6 @@ mod tests {
"integer": 20
});

assert_eq!(config_str, config_macro);

let config: Config = serde_json::from_value(config_macro).unwrap();
assert_eq!(config.protocol.message_timeout, 10000);
assert_eq!(config.get("integer").unwrap(), serde_json::json!(20));
Expand Down
5 changes: 3 additions & 2 deletions chain-signatures/node/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ impl MessageHandler for RunningState {

let mut signature_manager = self.signature_manager.write().await;
let signature_messages = queue.signature_bins.entry(self.epoch).or_default();
signature_messages.retain(|_, queue| {
signature_messages.retain(|receipt_id, queue| {
// Skip message if it already timed out
if queue.is_empty()
|| queue.iter().any(|msg| {
Expand All @@ -377,7 +377,8 @@ impl MessageHandler for RunningState {
{
return false;
}
true

!signature_manager.refresh_gc(receipt_id)
});
for (receipt_id, queue) in signature_messages {
// SAFETY: this unwrap() is safe since we have already checked that the queue is not empty.
Expand Down
43 changes: 26 additions & 17 deletions chain-signatures/node/src/protocol/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ use std::time::{Duration, Instant};

use near_account_id::AccountId;
use near_fetch::signer::SignerExt;
use near_primitives::hash::CryptoHash;

pub type ReceiptId = near_primitives::hash::CryptoHash;

pub struct SignRequest {
pub receipt_id: CryptoHash,
pub receipt_id: ReceiptId,
pub request: ContractSignRequest,
pub epsilon: Scalar,
pub delta: Scalar,
Expand All @@ -37,7 +38,7 @@ pub struct SignRequest {
#[derive(Default)]
pub struct SignQueue {
unorganized_requests: Vec<SignRequest>,
requests: HashMap<Participant, HashMap<CryptoHash, SignRequest>>,
requests: HashMap<Participant, HashMap<ReceiptId, SignRequest>>,
}

impl SignQueue {
Expand Down Expand Up @@ -108,14 +109,14 @@ impl SignQueue {
}
}

pub fn contains(&self, participant: Participant, receipt_id: CryptoHash) -> bool {
pub fn contains(&self, participant: Participant, receipt_id: ReceiptId) -> bool {
let Some(participant_requests) = self.requests.get(&participant) else {
return false;
};
participant_requests.contains_key(&receipt_id)
}

pub fn my_requests(&mut self, me: Participant) -> &mut HashMap<CryptoHash, SignRequest> {
pub fn my_requests(&mut self, me: Participant) -> &mut HashMap<ReceiptId, SignRequest> {
self.requests.entry(me).or_default()
}
}
Expand Down Expand Up @@ -185,11 +186,11 @@ pub struct GenerationRequest {

pub struct SignatureManager {
/// Ongoing signature generation protocols.
generators: HashMap<CryptoHash, SignatureGenerator>,
generators: HashMap<ReceiptId, SignatureGenerator>,
/// Failed signatures awaiting to be retried.
failed: VecDeque<(CryptoHash, GenerationRequest)>,
failed: VecDeque<(ReceiptId, GenerationRequest)>,
/// Set of completed signatures
completed: HashMap<PresignatureId, Instant>,
completed: HashMap<ReceiptId, Instant>,
/// Generated signatures assigned to the current node that are yet to be published.
/// Vec<(receipt_id, msg_hash, timestamp, output)>
signatures: Vec<ToPublish>,
Expand All @@ -200,7 +201,7 @@ pub struct SignatureManager {

pub const MAX_RETRY: u8 = 10;
pub struct ToPublish {
receipt_id: CryptoHash,
receipt_id: ReceiptId,
request: SignatureRequest,
time_added: Instant,
signature: FullSignature<Secp256k1>,
Expand All @@ -209,7 +210,7 @@ pub struct ToPublish {

impl ToPublish {
pub fn new(
receipt_id: CryptoHash,
receipt_id: ReceiptId,
request: SignatureRequest,
time_added: Instant,
signature: FullSignature<Secp256k1>,
Expand Down Expand Up @@ -291,7 +292,7 @@ impl SignatureManager {

fn retry_failed_generation(
&mut self,
receipt_id: CryptoHash,
receipt_id: ReceiptId,
req: GenerationRequest,
presignature: Presignature,
participants: &Participants,
Expand All @@ -315,7 +316,7 @@ impl SignatureManager {
pub fn generate(
&mut self,
participants: &Participants,
receipt_id: CryptoHash,
receipt_id: ReceiptId,
presignature: Presignature,
request: ContractSignRequest,
epsilon: Scalar,
Expand Down Expand Up @@ -358,7 +359,7 @@ impl SignatureManager {
pub fn get_or_generate(
&mut self,
participants: &Participants,
receipt_id: CryptoHash,
receipt_id: ReceiptId,
proposer: Participant,
presignature_id: PresignatureId,
request: &ContractSignRequest,
Expand All @@ -367,7 +368,7 @@ impl SignatureManager {
presignature_manager: &mut PresignatureManager,
cfg: &ProtocolConfig,
) -> Result<&mut SignatureProtocol, GenerationError> {
if self.completed.contains_key(&presignature_id) {
if self.completed.contains_key(&receipt_id) {
tracing::warn!(%receipt_id, presignature_id, "presignature has already been used to generate a signature");
return Err(GenerationError::AlreadyGenerated);
}
Expand Down Expand Up @@ -490,7 +491,7 @@ impl SignatureManager {
s = ?output.s,
"completed signature generation"
);
self.completed.insert(generator.presignature_id, Instant::now());
self.completed.insert(*receipt_id, Instant::now());
let request = SignatureRequest {
epsilon: SerializableScalar {scalar: generator.epsilon},
payload_hash: generator.request.payload.into(),
Expand All @@ -512,7 +513,7 @@ impl SignatureManager {
&mut self,
threshold: usize,
stable: &Participants,
my_requests: &mut HashMap<CryptoHash, SignRequest>,
my_requests: &mut HashMap<ReceiptId, SignRequest>,
presignature_manager: &mut PresignatureManager,
cfg: &ProtocolConfig,
) {
Expand Down Expand Up @@ -677,7 +678,15 @@ impl SignatureManager {
/// Garbage collect all the completed signatures.
pub fn garbage_collect(&mut self, cfg: &ProtocolConfig) {
self.completed.retain(|_, timestamp| {
timestamp.elapsed() < Duration::from_millis(cfg.garbage_timeout)
timestamp.elapsed() < Duration::from_millis(cfg.signature.garbage_timeout)
});
}

pub fn refresh_gc(&mut self, id: &ReceiptId) -> bool {
let entry = self
.completed
.entry(*id)
.and_modify(|e| *e = Instant::now());
matches!(entry, Entry::Occupied(_))
}
}
Loading