diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index aa37573..db3f0b5 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -11,8 +11,9 @@ use crate::{ EoaExecutorStore, events::EoaExecutorEvent, store::{ - BorrowedTransactionData, ConfirmedTransaction, EoaHealth, PendingTransaction, - SubmittedTransactionDehydrated, TransactionAttempt, TransactionStoreError, + BorrowedTransactionData, ConfirmedTransaction, EoaExecutorStoreKeys, EoaHealth, + PendingTransaction, SubmittedTransactionDehydrated, TransactionAttempt, + TransactionStoreError, borrowed::{BorrowedProcessingReport, ProcessBorrowedTransactions, SubmissionResult}, pending::{ MovePendingToBorrowedWithIncrementedNonces, MovePendingToBorrowedWithRecycledNonces, @@ -140,17 +141,28 @@ impl AtomicEoaExecutorStore { /// /// The transactions must have sequential nonces starting from the current optimistic count. /// This operation validates nonce ordering and atomically moves all transactions. + #[tracing::instrument(skip_all, fields(transactions = ?transactions))] pub async fn atomic_move_pending_to_borrowed_with_incremented_nonces( &self, transactions: &[BorrowedTransactionData], ) -> Result { - self.execute_with_watch_and_retry(&MovePendingToBorrowedWithIncrementedNonces { - transactions, - keys: &self.keys, - eoa: self.eoa, - chain_id: self.chain_id, - }) - .await + let (moved_count, new_optimistic_tx_count) = self + .execute_with_watch_and_retry(&MovePendingToBorrowedWithIncrementedNonces { + transactions, + keys: &self.keys, + eoa: self.eoa, + chain_id: self.chain_id, + }) + .await?; + + if let Some(new_optimistic_tx_count) = new_optimistic_tx_count { + tracing::info!( + new_optimistic_tx_count = new_optimistic_tx_count, + "Updated optimistic transaction count to {new_optimistic_tx_count}" + ); + } + + Ok(moved_count) } /// Atomically move multiple pending transactions to borrowed state using recycled nonces @@ -393,6 +405,7 @@ impl AtomicEoaExecutorStore { /// Synchronize nonces with the chain /// /// Part of standard nonce management flow, called in the confirm stage when chain nonce advances, and we need to update our cached nonce + #[tracing::instrument(skip_all, fields(current_chain_tx_count = current_chain_tx_count))] pub async fn update_cached_transaction_count( &self, current_chain_tx_count: u64, @@ -502,47 +515,35 @@ impl AtomicEoaExecutorStore { /// /// This is called when we have too many recycled nonces and detect something wrong /// We want to start fresh, with the chain nonce as the new optimistic nonce + #[tracing::instrument(skip_all)] pub async fn reset_nonces( &self, current_chain_tx_count: u64, ) -> Result<(), TransactionStoreError> { - let now = chrono::Utc::now().timestamp_millis().max(0) as u64; - - let current_health = self.get_eoa_health().await?; - - // Prepare health update if health data exists - let health_update = if let Some(mut health) = current_health { - health.nonce_resets.push(now); - Some(serde_json::to_string(&health)?) - } else { - None + let reset_tx = ResetNoncesTransaction { + keys: &self.store.keys, + current_chain_tx_count, }; - self.with_lock_check(|pipeline| { - let optimistic_key = self.optimistic_transaction_count_key_name(); - let cached_nonce_key = self.last_transaction_count_key_name(); - let recycled_key = self.recycled_nonces_zset_name(); - let manual_reset_key = self.manual_reset_key_name(); + let reset_result = self.execute_with_watch_and_retry(&reset_tx).await; - // Update health data only if it exists - if let Some(ref health_json) = health_update { - let health_key = self.eoa_health_key_name(); - pipeline.set(&health_key, health_json); + match &reset_result { + Ok(()) => { + tracing::info!( + current_chain_tx_count = current_chain_tx_count, + "Reset nonces successfully" + ); } + Err(e) => { + tracing::error!( + current_chain_tx_count = current_chain_tx_count, + error = ?e, + "Failed to reset nonces" + ); + } + } - // Reset the optimistic nonce - pipeline.set(&optimistic_key, current_chain_tx_count); - - // Reset the cached nonce - pipeline.set(&cached_nonce_key, current_chain_tx_count); - - // Reset the recycled nonces - pipeline.del(recycled_key); - - // Delete the manual reset key - pipeline.del(&manual_reset_key); - }) - .await + reset_result } /// Fail a transaction that's in the pending state (remove from pending and fail) @@ -654,3 +655,79 @@ impl AtomicEoaExecutorStore { .await } } + +/// SafeRedisTransaction implementation for resetting nonces +pub struct ResetNoncesTransaction<'a> { + pub keys: &'a EoaExecutorStoreKeys, + pub current_chain_tx_count: u64, +} + +impl SafeRedisTransaction for ResetNoncesTransaction<'_> { + type ValidationData = Option; + type OperationResult = (); + + fn name(&self) -> &str { + "reset nonces" + } + + fn watch_keys(&self) -> Vec { + vec![ + self.keys.optimistic_transaction_count_key_name(), + self.keys.last_transaction_count_key_name(), + self.keys.recycled_nonces_zset_name(), + self.keys.manual_reset_key_name(), + ] + } + + async fn validation( + &self, + _conn: &mut ConnectionManager, + store: &EoaExecutorStore, + ) -> Result { + let now = chrono::Utc::now().timestamp_millis().max(0) as u64; + + // Get current health data to prepare update + let current_health = store.get_eoa_health().await?; + let health_update = if let Some(mut health) = current_health { + health.nonce_resets.push(now); + // Keep only the last 5 nonce reset timestamps + if health.nonce_resets.len() > 5 { + health.nonce_resets.drain(0..health.nonce_resets.len() - 5); + } + Some(serde_json::to_string(&health)?) + } else { + None + }; + + Ok(health_update) + } + + fn operation( + &self, + pipeline: &mut twmq::redis::Pipeline, + health_update: Self::ValidationData, + ) -> Self::OperationResult { + let optimistic_key = self.keys.optimistic_transaction_count_key_name(); + let cached_nonce_key = self.keys.last_transaction_count_key_name(); + let recycled_key = self.keys.recycled_nonces_zset_name(); + let manual_reset_key = self.keys.manual_reset_key_name(); + + // Update health data only if it exists + if let Some(ref health_json) = health_update { + let health_key = self.keys.eoa_health_key_name(); + pipeline.set(&health_key, health_json); + } + + // Reset the optimistic nonce + pipeline.set(&optimistic_key, self.current_chain_tx_count); + + // Reset the cached nonce + pipeline.set(&cached_nonce_key, self.current_chain_tx_count); + + // Reset the recycled nonces + pipeline.del(recycled_key); + + // Delete the manual reset key + pipeline.del(&manual_reset_key); + } +} diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index a1e0e3a..c58bdbf 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -284,7 +284,10 @@ impl EoaExecutorStoreKeys { "{ns}:eoa_executor:pending_manual_reset:{}:{}", self.chain_id, self.eoa ), - None => format!("eoa_executor:pending_manual_reset:{}:{}", self.chain_id, self.eoa), + None => format!( + "eoa_executor:pending_manual_reset:{}:{}", + self.chain_id, self.eoa + ), } } } @@ -416,12 +419,15 @@ impl EoaExecutorStore { worker_id: worker_id.to_string(), }); } + let conflict_worker_id = conn.get::<_, Option>(&lock_key).await?; + // Lock exists, forcefully take it over tracing::warn!( eoa = ?self.eoa, chain_id = self.chain_id, worker_id = worker_id, - "Forcefully taking over EOA lock from stalled worker" + conflict_worker_id = ?conflict_worker_id, + "Forcefully taking over EOA lock from stalled worker." ); // Force set - no expiry, only released by explicit takeover let _: () = conn.set(&lock_key, worker_id).await?; @@ -504,15 +510,6 @@ impl EoaExecutorStore { } } - /// Peek recycled nonces without removing them - pub async fn peek_recycled_nonces(&self) -> Result, TransactionStoreError> { - let recycled_key = self.recycled_nonces_zset_name(); - let mut conn = self.redis.clone(); - - let nonces: Vec = conn.zrange(&recycled_key, 0, -1).await?; - Ok(nonces) - } - /// Peek at pending transactions without removing them (safe for planning) pub async fn peek_pending_transactions( &self, diff --git a/executors/src/eoa/store/pending.rs b/executors/src/eoa/store/pending.rs index fe09f2a..637e010 100644 --- a/executors/src/eoa/store/pending.rs +++ b/executors/src/eoa/store/pending.rs @@ -31,7 +31,7 @@ pub struct MovePendingToBorrowedWithIncrementedNonces<'a> { impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> { type ValidationData = Vec; // serialized borrowed transactions - type OperationResult = usize; // number of transactions processed + type OperationResult = (usize, Option); // number of transactions processed, new optimistic nonce fn name(&self) -> &str { "pending->borrowed with incremented nonces" @@ -59,10 +59,11 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> { let current_optimistic: Option = conn .get(self.keys.optimistic_transaction_count_key_name()) .await?; - let current_optimistic_nonce = current_optimistic.ok_or(TransactionStoreError::NonceSyncRequired { - eoa: self.eoa, - chain_id: self.chain_id, - })?; + let current_optimistic_nonce = + current_optimistic.ok_or(TransactionStoreError::NonceSyncRequired { + eoa: self.eoa, + chain_id: self.chain_id, + })?; // Extract and validate nonces let mut nonces: Vec = self @@ -134,13 +135,17 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> { pipeline.hset(&borrowed_key, &tx.transaction_id, borrowed_json); } - // Update optimistic tx count to highest nonce + 1 - if let Some(last_tx) = self.transactions.last() { - let new_optimistic_tx_count = last_tx.signed_transaction.nonce() + 1; + let new_optimistic_tx_count = self + .transactions + .last() + .map(|tx| tx.signed_transaction.nonce() + 1); + + // Update optimistic tx count to highest nonce + 1, if we have a new optimistic nonce + if let Some(new_optimistic_tx_count) = new_optimistic_tx_count { pipeline.set(&optimistic_key, new_optimistic_tx_count); } - self.transactions.len() + (self.transactions.len(), new_optimistic_tx_count) } } diff --git a/executors/src/eoa/worker/confirm.rs b/executors/src/eoa/worker/confirm.rs index 55b6530..34ba567 100644 --- a/executors/src/eoa/worker/confirm.rs +++ b/executors/src/eoa/worker/confirm.rs @@ -27,7 +27,7 @@ pub struct ConfirmedTransactionWithRichReceipt { impl EoaExecutorWorker { // ========== CONFIRM FLOW ========== - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))] pub async fn confirm_flow(&self) -> Result { // Get fresh on-chain transaction count let current_chain_transaction_count = self @@ -53,6 +53,10 @@ impl EoaExecutorWorker { let cached_transaction_count = match self.store.get_cached_transaction_count().await { Err(e) => match e { TransactionStoreError::NonceSyncRequired { .. } => { + tracing::warn!( + cached_transaction_count = current_chain_transaction_count, + "Nonce sync required, store was uninitialized, updating cached transaction count with current chain transaction count" + ); self.store .update_cached_transaction_count(current_chain_transaction_count) .await?; diff --git a/executors/src/eoa/worker/error.rs b/executors/src/eoa/worker/error.rs index 031c23c..6fbe28e 100644 --- a/executors/src/eoa/worker/error.rs +++ b/executors/src/eoa/worker/error.rs @@ -140,8 +140,9 @@ impl UserCancellable for EoaExecutorWorkerError { // ========== SIMPLE ERROR CLASSIFICATION ========== #[derive(Debug)] pub enum SendErrorClassification { - PossiblySent, // "nonce too low", "already known" etc - DeterministicFailure, // Invalid signature, malformed tx, insufficient funds etc + PossiblySent, // "nonce too low", "already known" etc + DeterministicFailure, // Invalid signature, malformed tx, insufficient funds etc + DeterministicFailureNonRetryable, // Non-retryable deterministic failure } #[derive(PartialEq, Eq, Debug)] @@ -185,11 +186,14 @@ pub fn classify_send_error( if error_str.contains("malformed") || error_str.contains("gas limit") || error_str.contains("intrinsic gas too low") - || error_str.contains("oversized") { return SendErrorClassification::DeterministicFailure; } + if error_str.contains("oversized") { + return SendErrorClassification::DeterministicFailureNonRetryable; + } + tracing::warn!( "Unknown send error: {}. PLEASE REPORT FOR ADDING CORRECT CLASSIFICATION [NOTIFY]", error_str @@ -305,6 +309,15 @@ impl SubmissionResult { transaction: borrowed_transaction.clone().into(), } } + SendErrorClassification::DeterministicFailureNonRetryable => SubmissionResult { + result: SubmissionResultType::Fail( + EoaExecutorWorkerError::TransactionSendError { + message: format!("Transaction send failed: {rpc_error}"), + inner_error: rpc_error.to_engine_error(chain), + }, + ), + transaction: borrowed_transaction.clone().into(), + }, } } } diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index b460cb3..51e8242 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -23,7 +23,9 @@ use crate::eoa::authorization_cache::EoaAuthorizationCache; use crate::eoa::store::{ AtomicEoaExecutorStore, EoaExecutorStore, EoaExecutorStoreKeys, EoaHealth, SubmissionResult, }; -use crate::metrics::{calculate_duration_seconds, calculate_duration_seconds_from_twmq, current_timestamp_ms, record_eoa_job_processing_time}; +use crate::metrics::{ + calculate_duration_seconds, current_timestamp_ms, record_eoa_job_processing_time, +}; use crate::webhook::WebhookJobHandler; pub mod confirm; @@ -148,6 +150,8 @@ where }) .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; + let worker_id = format!("{}:{}", uuid::Uuid::new_v4(), job.lease_token); + // 2. CREATE SCOPED STORE (acquires lock) let scoped = EoaExecutorStore::new( self.redis.clone(), @@ -155,13 +159,13 @@ where data.eoa_address, data.chain_id, ) - .acquire_eoa_lock_aggressively(&job.lease_token) + .acquire_eoa_lock_aggressively(&worker_id) .await .map_err(|e| Into::::into(e).handle())?; let delegated_account = DelegatedAccount::new(data.eoa_address, chain.clone()); - // if there's an error checking 7702 delegation here, we'll just assume it's not a minimal account for the pursposes of max in flight + // if there's an error checking 7702 delegation here, we'll just assume it's not a minimal account for the purposes of max in flight let is_minimal_account = self .authorization_cache .is_minimal_account(&delegated_account) @@ -192,7 +196,7 @@ where let job_start_time = current_timestamp_ms(); let result = worker.execute_main_workflow().await?; if let Err(e) = worker.release_eoa_lock().await { - tracing::error!(error = ?e, "Error releasing EOA lock"); + tracing::error!(error = ?e, worker_id = worker_id, "Error releasing EOA lock"); } // Record EOA job processing metrics diff --git a/executors/src/eoa/worker/send.rs b/executors/src/eoa/worker/send.rs index ea32023..23997d1 100644 --- a/executors/src/eoa/worker/send.rs +++ b/executors/src/eoa/worker/send.rs @@ -16,7 +16,7 @@ const HEALTH_CHECK_INTERVAL: u64 = 300; // 5 minutes in seconds impl EoaExecutorWorker { // ========== SEND FLOW ========== - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))] pub async fn send_flow(&self) -> Result { // 1. Get EOA health (initializes if needed) and check if we should update balance let mut health = self.get_eoa_health().await?; @@ -62,7 +62,7 @@ impl EoaExecutorWorker { total_sent += self.process_recycled_nonces().await?; // 3. Only proceed to new nonces if we successfully used all recycled nonces - let remaining_recycled = self.store.peek_recycled_nonces().await?.len(); + let remaining_recycled = self.store.clean_and_get_recycled_nonces().await?.len(); if remaining_recycled == 0 { let inflight_budget = self.store.get_inflight_budget(self.max_inflight).await?; if inflight_budget > 0 {