From 2a4dd176df254ca9db2acdcbcd5e43c4ec5ef314 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ceyhun=20=C5=9Een?= Date: Tue, 5 Nov 2024 12:31:47 +0700 Subject: [PATCH] rpc: aggregator: Simplify nonce generation part. --- core/src/rpc/aggregator.rs | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index f3f91e71..9c9b30f6 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -4,11 +4,11 @@ use super::clementine::{ use crate::{ aggregator::Aggregator, musig2::aggregate_nonces, - rpc::clementine::{self, nonce_gen_response, DepositSignSession}, + rpc::clementine::{self, nonce_gen_response, DepositSignSession, NonceGenResponse}, ByteArray66, }; use futures::future::try_join_all; -use tonic::{async_trait, Request, Response, Status}; +use tonic::{async_trait, Request, Response, Status, Streaming}; #[async_trait] impl ClementineAggregator for Aggregator { @@ -22,21 +22,20 @@ impl ClementineAggregator for Aggregator { &self, deposit_params: Request, ) -> Result, Status> { - tracing::info!("Recieved deposit: {:?}", deposit_params); - // generate nonces from all verifiers - let mut nonce_streams = try_join_all(self.verifier_clients.iter().map(|v| { - let mut client = v.clone(); // Clone each client to avoid mutable borrow - // https://github.com/hyperium/tonic/issues/33#issuecomment-538150828 + // Generate nonces from all verifiers. + let mut nonce_streams = try_join_all(self.verifier_clients.iter().map(|client| { + // Clone each client to avoid mutable borrow. + // https://github.com/hyperium/tonic/issues/33#issuecomment-538150828 + let mut client = client.clone(); + async move { - let stream = client.nonce_gen(Request::new(Empty {})).await?; - Ok::<_, Box>(stream.into_inner()) - // Return the stream + let response_stream = client.nonce_gen(Request::new(Empty {})).await?; + + Ok::, Status>(response_stream.into_inner()) } })) - .await - .map_err(|e| Status::internal(format!("Failed to generate nonce streams: {:?}", e)))?; - - tracing::debug!("Generated nonce streams"); + .await?; + tracing::debug!("Nonces are generated."); // Get the first responses from each stream let first_responses = try_join_all(nonce_streams.iter_mut().map(|s| async { @@ -69,7 +68,7 @@ impl ClementineAggregator for Aggregator { async move { let (tx, rx) = tokio::sync::mpsc::channel(4); let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(rx); - // let x = tokio_stream::iter(1..usize::MAX).map(|i| i.to_string()); + // let x = tokio_stream::iter(1..usize::MAX).map(|i| i.to_string()); let stream = client.deposit_sign(receiver_stream).await?; Ok::<_, Box>((stream.into_inner(), tx)) // Return the stream