Skip to content

Commit

Permalink
Improve tx finalizer simtests (#18903)
Browse files Browse the repository at this point in the history
## Description 

Introduce a timing config, and set the values differently for prod vs
tests.
This allows tests to run much more quickly.
Re-enable the simtests in nightly.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Aug 7, 2024
1 parent 1915892 commit 2261032
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 87 deletions.
122 changes: 57 additions & 65 deletions crates/sui-core/src/validator_tx_finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,6 @@ use tokio::select;
use tokio::time::Instant;
use tracing::{debug, error, trace};

/// Only wake up the transaction finalization task for a given transaction
/// after 1 mins of seeing it. This gives plenty of time for the transaction
/// to become final in the normal way. We also don't want this delay to be too long
/// to reduce memory usage held up by the finalizer threads.
const TX_FINALIZATION_DELAY: Duration = Duration::from_secs(60);
/// If a transaction can not be finalized within 1 min of being woken up, give up.
const FINALIZATION_TIMEOUT: Duration = Duration::from_secs(60);

/// Incremental delay for validators to wake up to finalize a transaction.
const VALIDATOR_DELAY_INCREMENTS_SEC: u64 = 10;

const VALIDATOR_MAX_DELAY: Duration = Duration::from_secs(180);

struct ValidatorTxFinalizerMetrics {
num_finalization_attempts: IntCounter,
num_successful_finalizations: IntCounter,
Expand Down Expand Up @@ -100,15 +87,51 @@ impl ValidatorTxFinalizerMetrics {
}
}

pub struct ValidatorTxFinalizerConfig {
pub tx_finalization_delay: Duration,
pub tx_finalization_timeout: Duration,
/// Incremental delay for validators to wake up to finalize a transaction.
pub validator_delay_increments_sec: u64,
pub validator_max_delay: Duration,
}

#[cfg(not(any(msim, test)))]
impl Default for ValidatorTxFinalizerConfig {
fn default() -> Self {
Self {
// Only wake up the transaction finalization task for a given transaction
// after 1 mins of seeing it. This gives plenty of time for the transaction
// to become final in the normal way. We also don't want this delay to be too long
// to reduce memory usage held up by the finalizer threads.
tx_finalization_delay: Duration::from_secs(60),
// If a transaction can not be finalized within 1 min of being woken up, give up.
tx_finalization_timeout: Duration::from_secs(60),
validator_delay_increments_sec: 10,
validator_max_delay: Duration::from_secs(180),
}
}
}

#[cfg(any(msim, test))]
impl Default for ValidatorTxFinalizerConfig {
fn default() -> Self {
Self {
tx_finalization_delay: Duration::from_secs(5),
tx_finalization_timeout: Duration::from_secs(60),
validator_delay_increments_sec: 1,
validator_max_delay: Duration::from_secs(15),
}
}
}

/// The `ValidatorTxFinalizer` is responsible for finalizing transactions that
/// have been signed by the validator. It does this by waiting for a delay
/// after the transaction has been signed, and then attempting to finalize
/// the transaction if it has not yet been done by a fullnode.
pub struct ValidatorTxFinalizer<C: Clone> {
agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
name: AuthorityName,
tx_finalization_delay: Duration,
finalization_timeout: Duration,
config: Arc<ValidatorTxFinalizerConfig>,
metrics: Arc<ValidatorTxFinalizerMetrics>,
}

Expand All @@ -121,8 +144,7 @@ impl<C: Clone> ValidatorTxFinalizer<C> {
Self {
agg,
name,
tx_finalization_delay: TX_FINALIZATION_DELAY,
finalization_timeout: FINALIZATION_TIMEOUT,
config: Arc::new(ValidatorTxFinalizerConfig::default()),
metrics: Arc::new(ValidatorTxFinalizerMetrics::new(registry)),
}
}
Expand All @@ -131,16 +153,8 @@ impl<C: Clone> ValidatorTxFinalizer<C> {
pub(crate) fn new_for_testing(
agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
name: AuthorityName,
tx_finalization_delay: Duration,
finalization_timeout: Duration,
) -> Self {
Self {
agg,
name,
tx_finalization_delay,
finalization_timeout,
metrics: Arc::new(ValidatorTxFinalizerMetrics::new(&Registry::new())),
}
Self::new(agg, name, &Registry::new())
}

#[cfg(test)]
Expand Down Expand Up @@ -208,7 +222,7 @@ where
"Invoking authority aggregator to finalize transaction"
);
tokio::time::timeout(
self.finalization_timeout,
self.config.tx_finalization_timeout,
self.agg
.load()
.execute_transaction_block(tx.into_unsigned().inner(), None),
Expand All @@ -234,11 +248,12 @@ where
return None;
};
// TODO: As an optimization, we could also limit the number of validators that would do this.
let extra_delay = position as u64 * VALIDATOR_DELAY_INCREMENTS_SEC;
let extra_delay = position as u64 * self.config.validator_delay_increments_sec;
let delay = self
.config
.tx_finalization_delay
.add(Duration::from_secs(extra_delay));
Some(min(delay, VALIDATOR_MAX_DELAY))
Some(min(delay, self.config.validator_max_delay))
}
}

Expand All @@ -248,10 +263,9 @@ mod tests {
use crate::authority::AuthorityState;
use crate::authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder};
use crate::authority_client::AuthorityAPI;
use crate::validator_tx_finalizer::{ValidatorTxFinalizer, VALIDATOR_MAX_DELAY};
use crate::validator_tx_finalizer::ValidatorTxFinalizer;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use prometheus::Registry;
use std::cmp::min;
use std::collections::BTreeMap;
use std::iter;
Expand All @@ -260,7 +274,6 @@ mod tests {
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::Duration;
use sui_macros::sim_test;
use sui_swarm_config::network_config_builder::ConfigBuilder;
use sui_test_transaction_builder::TestTransactionBuilder;
Expand Down Expand Up @@ -402,12 +415,7 @@ mod tests {
let gas_object = Object::with_owner_for_testing(sender);
let gas_object_id = gas_object.id();
let (states, auth_agg, clients) = create_validators(gas_object).await;
let finalizer1 = ValidatorTxFinalizer::new_for_testing(
auth_agg.clone(),
states[0].name,
Duration::from_secs(1),
Duration::from_secs(60),
);
let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
let tx_digest = *signed_tx.digest();
let cache_read = states[0].get_transaction_cache_reader().clone();
Expand Down Expand Up @@ -435,12 +443,7 @@ mod tests {
let gas_object = Object::with_owner_for_testing(sender);
let gas_object_id = gas_object.id();
let (states, auth_agg, clients) = create_validators(gas_object).await;
let finalizer1 = ValidatorTxFinalizer::new_for_testing(
auth_agg.clone(),
states[0].name,
Duration::from_secs(10),
Duration::from_secs(60),
);
let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
let tx_digest = *signed_tx.digest();
let epoch_store = states[0].epoch_store_for_testing();
Expand Down Expand Up @@ -472,12 +475,7 @@ mod tests {
let (sender, _) = get_account_key_pair();
let gas_object = Object::with_owner_for_testing(sender);
let (states, auth_agg, _clients) = create_validators(gas_object).await;
let finalizer1 = ValidatorTxFinalizer::new_for_testing(
auth_agg.clone(),
states[0].name,
Duration::from_secs(10),
Duration::from_secs(60),
);
let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
let mut new_auth_agg = (**auth_agg.load()).clone();
let mut new_committee = (*new_auth_agg.committee).clone();
new_committee.epoch = 100;
Expand All @@ -497,12 +495,7 @@ mod tests {
let gas_object = Object::with_owner_for_testing(sender);
let gas_object_id = gas_object.id();
let (states, auth_agg, clients) = create_validators(gas_object).await;
let finalizer1 = ValidatorTxFinalizer::new_for_testing(
auth_agg.clone(),
states[0].name,
Duration::from_secs(20),
Duration::from_secs(60),
);
let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
let tx_digest = *signed_tx.digest();
let cache_read = states[0].get_transaction_cache_reader().clone();
Expand Down Expand Up @@ -540,12 +533,7 @@ mod tests {
let gas_object = Object::with_owner_for_testing(sender);
let gas_object_id = gas_object.id();
let (states, auth_agg, clients) = create_validators(gas_object).await;
let finalizer1 = ValidatorTxFinalizer::new_for_testing(
auth_agg.clone(),
states[0].name,
Duration::from_secs(10),
Duration::from_secs(30),
);
let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
let tx_digest = *signed_tx.digest();
let cache_read = states[0].get_transaction_cache_reader().clone();
Expand Down Expand Up @@ -585,13 +573,13 @@ mod tests {
let auth_agg = Arc::new(auth_agg);
let finalizers = (0..COMMITTEE_SIZE)
.map(|idx| {
ValidatorTxFinalizer::new(
ValidatorTxFinalizer::new_for_testing(
Arc::new(ArcSwap::new(auth_agg.clone())),
auth_agg.committee.voting_rights[idx].0,
&Registry::new(),
)
})
.collect::<Vec<_>>();
let config = finalizers[0].config.clone();
for _ in 0..100 {
let tx_digest = TransactionDigest::random();
let mut delays: Vec<_> = finalizers
Expand All @@ -607,7 +595,11 @@ mod tests {
for (idx, delay) in delays.iter().enumerate() {
assert_eq!(
*delay,
min(VALIDATOR_MAX_DELAY.as_secs(), 60 + idx as u64 * 10)
min(
config.validator_max_delay.as_secs(),
config.tx_finalization_delay.as_secs()
+ idx as u64 * config.validator_delay_increments_sec
)
);
}
}
Expand Down
21 changes: 16 additions & 5 deletions crates/sui-e2e-tests/tests/reconfiguration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use sui_types::sui_system_state::{
get_validator_from_table, sui_system_state_summary::get_validator_by_pool_id,
SuiSystemStateTrait,
};
use sui_types::transaction::{TransactionDataAPI, TransactionExpiration};
use sui_types::transaction::{TransactionDataAPI, TransactionExpiration, VerifiedTransaction};
use test_cluster::{TestCluster, TestClusterBuilder};
use tokio::time::sleep;

Expand Down Expand Up @@ -297,7 +297,7 @@ async fn do_test_passive_reconfig() {
});
}

// Test for syncing a node to an authority that already has many txes.
// Test that transaction locks from previously epochs could be overridden.
#[sim_test]
async fn test_expired_locks() {
let test_cluster = TestClusterBuilder::new()
Expand All @@ -324,13 +324,24 @@ async fn test_expired_locks() {
};

let t1 = transfer_sui(1);
// attempt to equivocate
let t2 = transfer_sui(2);

for (idx, validator) in test_cluster.all_validator_handles().into_iter().enumerate() {
let state = validator.state();
let epoch_store = state.epoch_store_for_testing();
let t = if idx % 2 == 0 { t1.clone() } else { t2.clone() };
validator
.state()
.handle_transaction(&epoch_store, VerifiedTransaction::new_unchecked(t))
.await
.unwrap();
}
test_cluster
.create_certificate(t1.clone(), None)
.await
.unwrap();
.unwrap_err();

// attempt to equivocate
let t2 = transfer_sui(2);
test_cluster
.create_certificate(t2.clone(), None)
.await
Expand Down
53 changes: 37 additions & 16 deletions crates/sui-e2e-tests/tests/validator_tx_finalizer_e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,23 @@ async fn test_validator_tx_finalizer_fastpath_tx() {
.process_transaction(tx, None)
.await
.unwrap();
// Validators wait for 60s before the first one wakes up. Since 2f+1 signed the tx, i.e.
// 5 validators have signed the tx, in the worst case where the other 2 wake up first,
// it would take 60 + 3 * 10 = 90s for a validator to finalize this.
tokio::time::sleep(Duration::from_secs(120)).await;
for node in cluster.all_node_handles() {
node.with(|n| assert!(n.state().is_tx_already_executed(&tx_digest).unwrap()));
}
// Since 2f+1 signed the tx, i.e. 5 validators have signed the tx, in the worst case where the other 2 wake up first,
// it would take 10 + 3 * 1 = 13s for a validator to finalize this.
let tx_digests = [tx_digest];
tokio::time::timeout(Duration::from_secs(60), async move {
for node in cluster.all_node_handles() {
node.with_async(|n| async {
n.state()
.get_transaction_cache_reader()
.notify_read_executed_effects_digests(&tx_digests)
.await
.unwrap();
})
.await;
}
})
.await
.unwrap();
}

#[sim_test]
Expand All @@ -59,17 +69,28 @@ async fn test_validator_tx_finalizer_consensus_tx() {
.process_transaction(tx, None)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(120)).await;
for node in cluster.all_node_handles() {
node.with(|n| assert!(n.state().is_tx_already_executed(&tx_digest).unwrap()));
}
let tx_digests = [tx_digest];
tokio::time::timeout(Duration::from_secs(60), async move {
for node in cluster.all_node_handles() {
node.with_async(|n| async {
n.state()
.get_transaction_cache_reader()
.notify_read_executed_effects_digests(&tx_digests)
.await
.unwrap();
})
.await;
}
})
.await
.unwrap();
}

#[cfg(msim)]
#[sim_test]
async fn test_validator_tx_finalizer_equivocation() {
let cluster = TestClusterBuilder::new()
.with_num_validators(4)
.with_num_validators(7)
// Make epoch duration large enough so that reconfig is never triggered.
.with_epoch_duration_ms(1000 * 1000)
.build()
Expand All @@ -90,16 +111,16 @@ async fn test_validator_tx_finalizer_equivocation() {
let tx_digest2 = *tx2.digest();
let auth_agg = cluster.authority_aggregator();
for (idx, client) in auth_agg.authority_clients.values().enumerate() {
if idx < 2 {
if idx % 2 == 0 {
client.handle_transaction(tx1.clone(), None).await.unwrap();
} else {
client.handle_transaction(tx2.clone(), None).await.unwrap();
}
}
// It takes up to 90s for each validator to wake up and finalize the txs once.
// We wait for long enough and check that no validator will spawn another thread
// It takes up to 11s (5 + 6 * 1) for each validator to wake up and finalize the txs once.
// We wait for long enough and check that no validator will spawn a thread
// twice to try to finalize the same txs.
tokio::time::sleep(Duration::from_secs(200)).await;
tokio::time::sleep(Duration::from_secs(30)).await;
for node in cluster.swarm.validator_node_handles() {
node.with(|n| {
let state = n.state();
Expand Down
2 changes: 1 addition & 1 deletion scripts/simtest/simtest-run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ if [ -z "$NUM_CPUS" ]; then
fi

# filter out some tests that give spurious failures.
TEST_FILTER="(not (test(~batch_verification_tests) | test(~test_validator_tx_finalizer_)))"
TEST_FILTER="(not (test(~batch_verification_tests)))"

DATE=$(date +%s)
SEED="$DATE"
Expand Down

0 comments on commit 2261032

Please sign in to comment.