Skip to content

Commit

Permalink
Add transaction handler RPC for Mysticeti fastpath (#19401)
Browse files Browse the repository at this point in the history
This will not function until consensus adapter/handler side is
implemented.
  • Loading branch information
aschran authored Sep 29, 2024
1 parent 3864dca commit b3b80f7
Show file tree
Hide file tree
Showing 14 changed files with 472 additions and 65 deletions.
110 changes: 98 additions & 12 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,11 @@ pub struct AuthorityMetrics {
batch_size: Histogram,

authority_state_handle_transaction_latency: Histogram,
authority_state_handle_transaction_v2_latency: Histogram,

execute_certificate_latency_single_writer: Histogram,
execute_certificate_latency_shared_object: Histogram,
await_transaction_latency: Histogram,

execute_certificate_with_effects_latency: Histogram,
internal_execution_latency: Histogram,
Expand Down Expand Up @@ -434,8 +436,22 @@ impl AuthorityMetrics {
registry,
)
.unwrap(),
authority_state_handle_transaction_v2_latency: register_histogram_with_registry!(
"authority_state_handle_transaction_v2_latency",
"Latency of handling transactions with v2",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
execute_certificate_latency_single_writer,
execute_certificate_latency_shared_object,
await_transaction_latency: register_histogram_with_registry!(
"await_transaction_latency",
"Latency of awaiting user transaction execution, including waiting for inputs",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
execute_certificate_with_effects_latency: register_histogram_with_registry!(
"authority_state_execute_certificate_with_effects_latency",
"Latency of executing certificates with effects, including waiting for inputs",
Expand Down Expand Up @@ -839,17 +855,11 @@ impl AuthorityState {
self.checkpoint_store.get_epoch_state_commitments(epoch)
}

/// This is a private method and should be kept that way. It doesn't check whether
/// the provided transaction is a system transaction, and hence can only be called internally.
#[instrument(level = "trace", skip_all)]
async fn handle_transaction_impl(
fn handle_transaction_deny_checks(
&self,
transaction: VerifiedTransaction,
transaction: &VerifiedTransaction,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<VerifiedSignedTransaction> {
// Ensure that validator cannot reconfigure while we are signing the tx
let _execution_lock = self.execution_lock_for_signing().await;

) -> SuiResult<CheckedInputObjects> {
let tx_digest = transaction.digest();
let tx_data = transaction.data().transaction_data();

Expand Down Expand Up @@ -903,6 +913,23 @@ impl AuthorityState {
)?;
}

Ok(checked_input_objects)
}

/// This is a private method and should be kept that way. It doesn't check whether
/// the provided transaction is a system transaction, and hence can only be called internally.
#[instrument(level = "trace", skip_all)]
async fn handle_transaction_impl(
&self,
transaction: VerifiedTransaction,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<VerifiedSignedTransaction> {
// Ensure that validator cannot reconfigure while we are signing the tx
let _execution_lock = self.execution_lock_for_signing().await;

let checked_input_objects =
self.handle_transaction_deny_checks(&transaction, epoch_store)?;

let owned_objects = checked_input_objects.inner().filter_owned_objects();

let signed_transaction = VerifiedSignedTransaction::new(
Expand Down Expand Up @@ -974,6 +1001,51 @@ impl AuthorityState {
}
}

#[instrument(level = "trace", skip_all)]
pub async fn handle_transaction_v2(
&self,
epoch_store: &Arc<AuthorityPerEpochStore>,
transaction: VerifiedTransaction,
) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
let tx_digest = *transaction.digest();
debug!("handle_transaction_v2");

// Ensure an idempotent answer.
let tx_status = self.get_transaction_status(&tx_digest, epoch_store)?;
if tx_status.is_some() {
return Ok(tx_status);
}

let _metrics_guard = self
.metrics
.authority_state_handle_transaction_v2_latency
.start_timer();
self.metrics.tx_orders.inc();

// The should_accept_user_certs check here is best effort, because
// between a validator signs a tx and a cert is formed, the validator
// could close the window.
if !epoch_store
.get_reconfig_state_read_lock_guard()
.should_accept_user_certs()
{
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}

match self.handle_transaction_impl(transaction, epoch_store).await {
// TODO(fastpath): We don't actually need the signed transaction here but just call
// into this function to acquire locks. Consider refactoring to avoid the extra work.
Ok(_signed) => Ok(None),
// It happens frequently that while we are checking the validity of the transaction, it
// has just been executed.
// In that case, we could still return Ok to avoid showing confusing errors.
Err(e) => self
.get_transaction_status(&tx_digest, epoch_store)?
.ok_or(e)
.map(Some),
}
}

pub fn check_system_overload_at_signing(&self) -> bool {
self.config
.authority_overload_config
Expand Down Expand Up @@ -1127,7 +1199,21 @@ impl AuthorityState {
self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
}

self.notify_read_effects(certificate).await
self.notify_read_effects(*certificate.digest()).await
}

/// Awaits the effects of executing a user transaction.
///
/// Relies on consensus to enqueue the transaction for execution.
pub async fn await_transaction_effects(
&self,
digest: TransactionDigest,
) -> SuiResult<TransactionEffects> {
let _metrics_guard = self.metrics.await_transaction_latency.start_timer();
debug!("await_transaction");

// TODO(fastpath): Add handling for transactions rejected by Mysticeti fast path.
self.notify_read_effects(digest).await
}

/// Internal logic to execute a certificate.
Expand Down Expand Up @@ -1219,10 +1305,10 @@ impl AuthorityState {

pub async fn notify_read_effects(
&self,
certificate: &VerifiedCertificate,
digest: TransactionDigest,
) -> SuiResult<TransactionEffects> {
self.get_transaction_cache_reader()
.notify_read_executed_effects(&[*certificate.digest()])
.notify_read_executed_effects(&[digest])
.await
.map(|mut r| r.pop().expect("must return correct number of effects"))
}
Expand Down
13 changes: 4 additions & 9 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1965,17 +1965,12 @@ impl AuthorityPerEpochStore {
.any(|processed| processed))
}

/// Check whether any certificates were processed by consensus.
/// This handles multiple certificates at once.
pub fn is_all_tx_certs_consensus_message_processed<'a>(
/// Returns true if all messages with the given keys were processed by consensus.
pub fn all_external_consensus_messages_processed(
&self,
certificates: impl Iterator<Item = &'a VerifiedCertificate>,
keys: impl Iterator<Item = ConsensusTransactionKey>,
) -> SuiResult<bool> {
let keys = certificates.map(|cert| {
SequencedConsensusTransactionKey::External(ConsensusTransactionKey::Certificate(
*cert.digest(),
))
});
let keys = keys.map(SequencedConsensusTransactionKey::External);
Ok(self
.check_consensus_messages_processed(keys)?
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ pub async fn enqueue_all_and_execute_all(
);
let mut output = Vec::new();
for cert in certificates {
let effects = authority.notify_read_effects(&cert).await?;
let effects = authority.notify_read_effects(*cert.digest()).await?;
output.push(effects);
}
Ok(output)
Expand Down
Loading

0 comments on commit b3b80f7

Please sign in to comment.