Skip to content

Commit

Permalink
rpc: aggregator: Simplify nonce generation part.
Browse files Browse the repository at this point in the history
  • Loading branch information
ceyhunsen committed Nov 5, 2024
1 parent ac1b25a commit 2a4dd17
Showing 1 changed file with 14 additions and 15 deletions.
29 changes: 14 additions & 15 deletions core/src/rpc/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use super::clementine::{
use crate::{
aggregator::Aggregator,
musig2::aggregate_nonces,
rpc::clementine::{self, nonce_gen_response, DepositSignSession},
rpc::clementine::{self, nonce_gen_response, DepositSignSession, NonceGenResponse},
ByteArray66,
};
use futures::future::try_join_all;
use tonic::{async_trait, Request, Response, Status};
use tonic::{async_trait, Request, Response, Status, Streaming};

#[async_trait]
impl ClementineAggregator for Aggregator {
Expand All @@ -22,21 +22,20 @@ impl ClementineAggregator for Aggregator {
&self,
deposit_params: Request<DepositParams>,
) -> Result<Response<RawSignedMoveTx>, Status> {
tracing::info!("Recieved deposit: {:?}", deposit_params);
// generate nonces from all verifiers
let mut nonce_streams = try_join_all(self.verifier_clients.iter().map(|v| {
let mut client = v.clone(); // Clone each client to avoid mutable borrow
// https://github.com/hyperium/tonic/issues/33#issuecomment-538150828
// Generate nonces from all verifiers.
let mut nonce_streams = try_join_all(self.verifier_clients.iter().map(|client| {
// Clone each client to avoid mutable borrow.
// https://github.com/hyperium/tonic/issues/33#issuecomment-538150828
let mut client = client.clone();

async move {
let stream = client.nonce_gen(Request::new(Empty {})).await?;
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(stream.into_inner())
// Return the stream
let response_stream = client.nonce_gen(Request::new(Empty {})).await?;

Ok::<Streaming<NonceGenResponse>, Status>(response_stream.into_inner())
}
}))
.await
.map_err(|e| Status::internal(format!("Failed to generate nonce streams: {:?}", e)))?;

tracing::debug!("Generated nonce streams");
.await?;
tracing::debug!("Nonces are generated.");

// Get the first responses from each stream
let first_responses = try_join_all(nonce_streams.iter_mut().map(|s| async {
Expand Down Expand Up @@ -69,7 +68,7 @@ impl ClementineAggregator for Aggregator {
async move {
let (tx, rx) = tokio::sync::mpsc::channel(4);
let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
// let x = tokio_stream::iter(1..usize::MAX).map(|i| i.to_string());
// let x = tokio_stream::iter(1..usize::MAX).map(|i| i.to_string());
let stream = client.deposit_sign(receiver_stream).await?;
Ok::<_, Box<dyn std::error::Error + Send + Sync>>((stream.into_inner(), tx))
// Return the stream
Expand Down

0 comments on commit 2a4dd17

Please sign in to comment.