diff --git a/executors/src/eoa/store/borrowed.rs b/executors/src/eoa/store/borrowed.rs index 91bdbd9..83d9e54 100644 --- a/executors/src/eoa/store/borrowed.rs +++ b/executors/src/eoa/store/borrowed.rs @@ -12,7 +12,7 @@ use crate::eoa::{ }, worker::error::EoaExecutorWorkerError, }; -use crate::metrics::{current_timestamp_ms, calculate_duration_seconds, EoaMetrics}; +use crate::metrics::{EoaMetrics, calculate_duration_seconds, current_timestamp_ms}; use crate::webhook::{WebhookJobHandler, queue_webhook_envelopes}; #[derive(Debug, Clone)] @@ -62,7 +62,10 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { } fn watch_keys(&self) -> Vec { - vec![self.keys.borrowed_transactions_hashmap_name()] + vec![ + self.keys.borrowed_transactions_hashmap_name(), + self.keys.recycled_nonces_zset_name(), + ] } async fn validation( @@ -121,15 +124,13 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { SubmissionResultType::Success => { // Record metrics: transaction queued to sent let sent_timestamp = current_timestamp_ms(); - let queued_to_sent_duration = calculate_duration_seconds( - result.transaction.queued_at, - sent_timestamp - ); + let queued_to_sent_duration = + calculate_duration_seconds(result.transaction.queued_at, sent_timestamp); // Record metrics using the clean EoaMetrics abstraction self.eoa_metrics.record_transaction_sent( self.keys.eoa, self.keys.chain_id, - queued_to_sent_duration + queued_to_sent_duration, ); // Add to submitted zset @@ -189,7 +190,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { // Update transaction data status let tx_data_key = self.keys.transaction_data_key_name(transaction_id); pipeline.hset(&tx_data_key, "status", "pending"); - + // ask for this nonce to be recycled because we did not consume the nonce pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce); diff --git a/executors/src/eoa/store/submitted.rs b/executors/src/eoa/store/submitted.rs index a4c81f1..9e02173 100644 --- a/executors/src/eoa/store/submitted.rs +++ b/executors/src/eoa/store/submitted.rs @@ -560,6 +560,7 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> { vec![ self.keys.recycled_nonces_zset_name(), self.keys.last_transaction_count_key_name(), + self.keys.optimistic_transaction_count_key_name(), self.keys.submitted_transactions_zset_name(), ] } @@ -618,6 +619,11 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> { "+inf", ); + pipeline.set( + self.keys.optimistic_transaction_count_key_name(), + highest_submitted_nonce + 1, + ); + recycled_nonces } } diff --git a/executors/src/eoa/worker/send.rs b/executors/src/eoa/worker/send.rs index 3cb5aa0..36fe55a 100644 --- a/executors/src/eoa/worker/send.rs +++ b/executors/src/eoa/worker/send.rs @@ -4,12 +4,10 @@ use engine_core::{chain::Chain, error::AlloyRpcErrorToEngineError}; use crate::eoa::{ store::{BorrowedTransaction, PendingTransaction, SubmissionResult}, worker::{ - EoaExecutorWorker, error::{ - EoaExecutorWorkerError, SendContext, is_retryable_preparation_error, - should_update_balance_threshold, - }, - }, + is_retryable_preparation_error, should_update_balance_threshold, EoaExecutorWorkerError, SendContext + }, EoaExecutorWorker + }, EoaExecutorStore, }; const HEALTH_CHECK_INTERVAL_MS: u64 = 60 * 5 * 1000; // 5 minutes in milliseconds @@ -20,7 +18,7 @@ impl EoaExecutorWorker { pub async fn send_flow(&self) -> Result { // 1. Get EOA health (initializes if needed) and check if we should update balance let mut health = self.get_eoa_health().await?; - let now = chrono::Utc::now().timestamp_millis().max(0) as u64; + let now = EoaExecutorStore::now(); // Update balance if it's stale // TODO: refactor this, very ugly