Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimising validator tx fianlizer #19272

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ impl AuthorityState {
let cache_reader = self.get_transaction_cache_reader().clone();
let epoch_store = epoch_store.clone();
spawn_monitored_task!(epoch_store.within_alive_epoch(
validator_tx_finalizer.track_signed_tx(cache_reader, tx)
validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
));
}
}
Expand Down
20 changes: 13 additions & 7 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ pub struct AuthorityPerEpochStore {
/// We need to keep track of those in order to know when to send EndOfPublish message.
/// Lock ordering: this is a 'leaf' lock, no other locks should be acquired in the scope of this lock
/// In particular, this lock is always acquired after taking read or write lock on reconfig state
pending_consensus_certificates: Mutex<HashSet<TransactionDigest>>,
pending_consensus_certificates: RwLock<HashSet<TransactionDigest>>,

/// MutexTable for transaction locks (prevent concurrent execution of same transaction)
mutex_table: MutexTable<TransactionDigest>,
Expand Down Expand Up @@ -876,7 +876,7 @@ impl AuthorityPerEpochStore {
running_root_notify_read: NotifyRead::new(),
executed_digests_notify_read: NotifyRead::new(),
end_of_publish: Mutex::new(end_of_publish),
pending_consensus_certificates: Mutex::new(pending_consensus_certificates),
pending_consensus_certificates: RwLock::new(pending_consensus_certificates),
mutex_table: MutexTable::new(MUTEX_TABLE_SIZE),
epoch_open_time: current_time,
epoch_close_time: Default::default(),
Expand Down Expand Up @@ -1889,7 +1889,7 @@ impl AuthorityPerEpochStore {
"Reconfiguration state should allow accepting user transactions"
);
self.pending_consensus_certificates
.lock()
.write()
.insert(*cert.digest());
}
}
Expand All @@ -1906,22 +1906,28 @@ impl AuthorityPerEpochStore {
// TODO: lock once for all remove() calls.
for key in keys {
if let ConsensusTransactionKey::Certificate(cert) = key {
self.pending_consensus_certificates.lock().remove(cert);
self.pending_consensus_certificates.write().remove(cert);
}
}
Ok(())
}

pub fn pending_consensus_certificates_count(&self) -> usize {
self.pending_consensus_certificates.lock().len()
self.pending_consensus_certificates.read().len()
}

pub fn pending_consensus_certificates_empty(&self) -> bool {
self.pending_consensus_certificates.lock().is_empty()
self.pending_consensus_certificates.read().is_empty()
}

pub fn pending_consensus_certificates(&self) -> HashSet<TransactionDigest> {
self.pending_consensus_certificates.lock().clone()
self.pending_consensus_certificates.read().clone()
}

pub fn is_pending_consensus_certificate(&self, tx_digest: &TransactionDigest) -> bool {
self.pending_consensus_certificates
.read()
.contains(tx_digest)
}

pub fn deferred_transactions_empty(&self) -> bool {
Expand Down
29 changes: 24 additions & 5 deletions crates/sui-core/src/validator_tx_finalizer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::authority_aggregator::AuthorityAggregator;
use crate::authority_client::AuthorityAPI;
use crate::execution_cache::TransactionCacheRead;
Expand Down Expand Up @@ -177,11 +178,15 @@ where
pub async fn track_signed_tx(
&self,
cache_read: Arc<dyn TransactionCacheRead>,
epoch_store: &Arc<AuthorityPerEpochStore>,
tx: VerifiedSignedTransaction,
) {
let tx_digest = *tx.digest();
trace!(?tx_digest, "Tracking signed transaction");
match self.delay_and_finalize_tx(cache_read, tx).await {
match self
.delay_and_finalize_tx(cache_read, epoch_store, tx)
.await
{
Ok(did_run) => {
if did_run {
debug!(?tx_digest, "Transaction finalized");
Expand All @@ -196,6 +201,7 @@ where
async fn delay_and_finalize_tx(
&self,
cache_read: Arc<dyn TransactionCacheRead>,
epoch_store: &Arc<AuthorityPerEpochStore>,
tx: VerifiedSignedTransaction,
) -> anyhow::Result<bool> {
let tx_digest = *tx.digest();
Expand All @@ -213,6 +219,14 @@ where
}
}

if epoch_store.is_pending_consensus_certificate(&tx_digest) {
trace!(
?tx_digest,
"Transaction has been submitted to consensus, no need to help drive finality"
);
return Ok(false);
}

self.metrics
.validator_tx_finalizer_attempt_delay
.observe(tx_finalization_delay.as_secs_f64());
Expand Down Expand Up @@ -419,9 +433,12 @@ mod tests {
let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
let tx_digest = *signed_tx.digest();
let cache_read = states[0].get_transaction_cache_reader().clone();
let epoch_store = states[0].epoch_store_for_testing();
let metrics = finalizer1.metrics.clone();
let handle = tokio::spawn(async move {
finalizer1.track_signed_tx(cache_read, signed_tx).await;
finalizer1
.track_signed_tx(cache_read, &epoch_store, signed_tx)
.await;
});
handle.await.unwrap();
check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
Expand Down Expand Up @@ -452,7 +469,7 @@ mod tests {
let metrics = finalizer1.metrics.clone();
let handle = tokio::spawn(async move {
let _ = epoch_store
.within_alive_epoch(finalizer1.track_signed_tx(cache_read, signed_tx))
.within_alive_epoch(finalizer1.track_signed_tx(cache_read, &epoch_store, signed_tx))
.await;
});
states[0].reconfigure_for_testing().await;
Expand Down Expand Up @@ -499,12 +516,13 @@ mod tests {
let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
let tx_digest = *signed_tx.digest();
let cache_read = states[0].get_transaction_cache_reader().clone();
let epoch_store = states[0].epoch_store_for_testing();

let metrics = finalizer1.metrics.clone();
let signed_tx_clone = signed_tx.clone();
let handle = tokio::spawn(async move {
finalizer1
.track_signed_tx(cache_read, signed_tx_clone)
.track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
.await;
});
auth_agg
Expand Down Expand Up @@ -537,6 +555,7 @@ mod tests {
let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
let tx_digest = *signed_tx.digest();
let cache_read = states[0].get_transaction_cache_reader().clone();
let epoch_store = states[0].epoch_store_for_testing();
for client in clients.values() {
client.inject_fault.store(true, Relaxed);
}
Expand All @@ -545,7 +564,7 @@ mod tests {
let signed_tx_clone = signed_tx.clone();
let handle = tokio::spawn(async move {
finalizer1
.track_signed_tx(cache_read, signed_tx_clone)
.track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
.await;
});
handle.await.unwrap();
Expand Down
Loading