Skip to content

Commit

Permalink
[Bridge Node] Better optimize sig aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
williampsmith committed Oct 4, 2024
1 parent 8761bf8 commit ccfd466
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 30 deletions.
106 changes: 77 additions & 29 deletions crates/sui-authority-aggregation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use futures::Future;
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use mysten_metrics::monitored_future;

use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
use sui_types::base_types::ConciseableName;
Expand Down Expand Up @@ -40,7 +41,8 @@ pub async fn quorum_map_then_reduce_with_timeout_and_prefs<
initial_state: S,
map_each_authority: FMap,
reduce_result: FReduce,
initial_timeout: Duration,
max_timeout: Duration,
min_timeout: Option<Duration>,
) -> Result<
(
R,
Expand All @@ -49,7 +51,7 @@ pub async fn quorum_map_then_reduce_with_timeout_and_prefs<
S,
>
where
K: Ord + ConciseableName<'a> + Clone + 'a,
K: Ord + ConciseableName<'a> + Clone + 'a + Hash,
C: CommitteeTrait<K>,
FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>>,
Expand All @@ -58,6 +60,7 @@ where

// First, execute in parallel for each authority FMap.
let mut responses: futures::stream::FuturesUnordered<_> = authorities_shuffled
.clone()
.into_iter()
.map(|name| {
let client = authority_clients[&name].clone();
Expand All @@ -66,33 +69,77 @@ where
})
.collect();

let mut current_timeout = initial_timeout;
let mut accumulated_state = initial_state;
// Then, as results become available fold them into the state using FReduce.
while let Ok(Some((authority_name, result))) = timeout(current_timeout, responses.next()).await
{
let authority_weight = committee.weight(&authority_name);
accumulated_state =
match reduce_result(accumulated_state, authority_name, authority_weight, result).await {
// In the first two cases we are told to continue the iteration.
ReduceOutput::Continue(state) => state,
ReduceOutput::ContinueWithTimeout(state, duration) => {
// Adjust the waiting timeout.
current_timeout = duration;
state
}
ReduceOutput::Failed(state) => {
return Err(state);
}
ReduceOutput::Success(result) => {
// The reducer tells us that we have the result needed. Just return it.
return Ok((result, responses));
}
match min_timeout {
Some(min_timeout) => {
let mut accumulated_state = initial_state;
let mut authority_to_result: HashMap<K, Result<V, E>> = HashMap::new();

while let Ok(Some((authority_name, result))) =
timeout(min_timeout, responses.next()).await
{
authority_to_result.insert(authority_name.clone(), result);
}
for authority_name in authorities_shuffled {
let authority_weight = committee.weight(&authority_name);
let result = authority_to_result.remove(&authority_name).unwrap();
accumulated_state = match reduce_result(
accumulated_state,
authority_name,
authority_weight,
result,
)
.await
{
// In the first two cases we are told to continue the iteration.
ReduceOutput::Continue(state) => state,
ReduceOutput::ContinueWithTimeout(state, _duration) => state,
ReduceOutput::Failed(state) => {
return Err(state);
}
ReduceOutput::Success(result) => {
// The reducer tells us that we have the result needed. Just return it.
return Ok((result, responses));
}
};
}
// If we have exhausted all authorities and still have not returned a result, return
// error with the accumulated state.
Err(accumulated_state)
}
None => {
// As results become available fold them into the state using FReduce.
let mut current_timeout = max_timeout;
let mut accumulated_state = initial_state;

while let Ok(Some((authority_name, result))) =
timeout(current_timeout, responses.next()).await
{
let authority_weight = committee.weight(&authority_name);
accumulated_state =
match reduce_result(accumulated_state, authority_name, authority_weight, result)
.await
{
// In the first two cases we are told to continue the iteration.
ReduceOutput::Continue(state) => state,
ReduceOutput::ContinueWithTimeout(state, duration) => {
// Adjust the waiting timeout.
current_timeout = duration;
state
}
ReduceOutput::Failed(state) => {
return Err(state);
}
ReduceOutput::Success(result) => {
// The reducer tells us that we have the result needed. Just return it.
return Ok((result, responses));
}
}
}
// If we have exhausted all authorities and still have not returned a result, return
// error with the accumulated state.
Err(accumulated_state)
}
}
// If we have exhausted all authorities and still have not returned a result, return
// error with the accumulated state.
Err(accumulated_state)
}

/// This function takes an initial state, than executes an asynchronous function (FMap) for each
Expand Down Expand Up @@ -142,7 +189,7 @@ pub async fn quorum_map_then_reduce_with_timeout<
S,
>
where
K: Ord + ConciseableName<'a> + Clone + 'a,
K: Ord + ConciseableName<'a> + Clone + 'a + Hash,
C: CommitteeTrait<K>,
FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>> + 'a,
Expand All @@ -155,6 +202,7 @@ where
map_each_authority,
reduce_result,
initial_timeout,
None,
)
.await
}
6 changes: 5 additions & 1 deletion crates/sui-bridge/src/client/bridge_authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use sui_types::committee::StakeUnit;
use sui_types::committee::TOTAL_VOTING_POWER;
use tracing::{error, info, warn};

const MAX_TIMEOUT_MS: u64 = 5000;
const MIN_TIMEOUT_MS: u64 = 1500;

pub struct BridgeAuthorityAggregator {
pub committee: Arc<BridgeCommittee>,
pub clients: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>>>,
Expand Down Expand Up @@ -235,7 +238,8 @@ async fn request_sign_bridge_action_into_certification(
})
},
// A herustic timeout, we expect the signing to finish within 5 seconds
Duration::from_secs(5),
Duration::from_secs(MAX_TIMEOUT_MS),
Some(Duration::from_secs(MIN_TIMEOUT_MS)),
)
.await
.map_err(|state| {
Expand Down

0 comments on commit ccfd466

Please sign in to comment.