@@ -11,8 +11,9 @@ use crate::{
1111 EoaExecutorStore ,
1212 events:: EoaExecutorEvent ,
1313 store:: {
14- BorrowedTransactionData , ConfirmedTransaction , EoaHealth , PendingTransaction ,
15- SubmittedTransactionDehydrated , TransactionAttempt , TransactionStoreError ,
14+ BorrowedTransactionData , ConfirmedTransaction , EoaExecutorStoreKeys , EoaHealth ,
15+ PendingTransaction , SubmittedTransactionDehydrated , TransactionAttempt ,
16+ TransactionStoreError ,
1617 borrowed:: { BorrowedProcessingReport , ProcessBorrowedTransactions , SubmissionResult } ,
1718 pending:: {
1819 MovePendingToBorrowedWithIncrementedNonces , MovePendingToBorrowedWithRecycledNonces ,
@@ -140,17 +141,28 @@ impl AtomicEoaExecutorStore {
140141 ///
141142 /// The transactions must have sequential nonces starting from the current optimistic count.
142143 /// This operation validates nonce ordering and atomically moves all transactions.
144+ #[ tracing:: instrument( skip_all, fields( transactions = ?transactions) ) ]
143145 pub async fn atomic_move_pending_to_borrowed_with_incremented_nonces (
144146 & self ,
145147 transactions : & [ BorrowedTransactionData ] ,
146148 ) -> Result < usize , TransactionStoreError > {
147- self . execute_with_watch_and_retry ( & MovePendingToBorrowedWithIncrementedNonces {
148- transactions,
149- keys : & self . keys ,
150- eoa : self . eoa ,
151- chain_id : self . chain_id ,
152- } )
153- . await
149+ let ( moved_count, new_optimistic_tx_count) = self
150+ . execute_with_watch_and_retry ( & MovePendingToBorrowedWithIncrementedNonces {
151+ transactions,
152+ keys : & self . keys ,
153+ eoa : self . eoa ,
154+ chain_id : self . chain_id ,
155+ } )
156+ . await ?;
157+
158+ if let Some ( new_optimistic_tx_count) = new_optimistic_tx_count {
159+ tracing:: info!(
160+ new_optimistic_tx_count = new_optimistic_tx_count,
161+ "Updated optimistic transaction count to {new_optimistic_tx_count}"
162+ ) ;
163+ }
164+
165+ Ok ( moved_count)
154166 }
155167
156168 /// Atomically move multiple pending transactions to borrowed state using recycled nonces
@@ -393,6 +405,7 @@ impl AtomicEoaExecutorStore {
393405 /// Synchronize nonces with the chain
394406 ///
395407 /// Part of standard nonce management flow, called in the confirm stage when chain nonce advances, and we need to update our cached nonce
408+ #[ tracing:: instrument( skip_all, fields( current_chain_tx_count = current_chain_tx_count) ) ]
396409 pub async fn update_cached_transaction_count (
397410 & self ,
398411 current_chain_tx_count : u64 ,
@@ -502,47 +515,35 @@ impl AtomicEoaExecutorStore {
502515 ///
503516 /// This is called when we have too many recycled nonces and detect something wrong
504517 /// We want to start fresh, with the chain nonce as the new optimistic nonce
518+ #[ tracing:: instrument( skip_all) ]
505519 pub async fn reset_nonces (
506520 & self ,
507521 current_chain_tx_count : u64 ,
508522 ) -> Result < ( ) , TransactionStoreError > {
509- let now = chrono:: Utc :: now ( ) . timestamp_millis ( ) . max ( 0 ) as u64 ;
510-
511- let current_health = self . get_eoa_health ( ) . await ?;
512-
513- // Prepare health update if health data exists
514- let health_update = if let Some ( mut health) = current_health {
515- health. nonce_resets . push ( now) ;
516- Some ( serde_json:: to_string ( & health) ?)
517- } else {
518- None
523+ let reset_tx = ResetNoncesTransaction {
524+ keys : & self . store . keys ,
525+ current_chain_tx_count,
519526 } ;
520527
521- self . with_lock_check ( |pipeline| {
522- let optimistic_key = self . optimistic_transaction_count_key_name ( ) ;
523- let cached_nonce_key = self . last_transaction_count_key_name ( ) ;
524- let recycled_key = self . recycled_nonces_zset_name ( ) ;
525- let manual_reset_key = self . manual_reset_key_name ( ) ;
528+ let reset_result = self . execute_with_watch_and_retry ( & reset_tx) . await ;
526529
527- // Update health data only if it exists
528- if let Some ( ref health_json) = health_update {
529- let health_key = self . eoa_health_key_name ( ) ;
530- pipeline. set ( & health_key, health_json) ;
530+ match & reset_result {
531+ Ok ( ( ) ) => {
532+ tracing:: info!(
533+ current_chain_tx_count = current_chain_tx_count,
534+ "Reset nonces successfully"
535+ ) ;
531536 }
537+ Err ( e) => {
538+ tracing:: error!(
539+ current_chain_tx_count = current_chain_tx_count,
540+ error = ?e,
541+ "Failed to reset nonces"
542+ ) ;
543+ }
544+ }
532545
533- // Reset the optimistic nonce
534- pipeline. set ( & optimistic_key, current_chain_tx_count) ;
535-
536- // Reset the cached nonce
537- pipeline. set ( & cached_nonce_key, current_chain_tx_count) ;
538-
539- // Reset the recycled nonces
540- pipeline. del ( recycled_key) ;
541-
542- // Delete the manual reset key
543- pipeline. del ( & manual_reset_key) ;
544- } )
545- . await
546+ reset_result
546547 }
547548
548549 /// Fail a transaction that's in the pending state (remove from pending and fail)
@@ -654,3 +655,79 @@ impl AtomicEoaExecutorStore {
654655 . await
655656 }
656657}
658+
659+ /// SafeRedisTransaction implementation for resetting nonces
660+ pub struct ResetNoncesTransaction < ' a > {
661+ pub keys : & ' a EoaExecutorStoreKeys ,
662+ pub current_chain_tx_count : u64 ,
663+ }
664+
665+ impl SafeRedisTransaction for ResetNoncesTransaction < ' _ > {
666+ type ValidationData = Option < String > ;
667+ type OperationResult = ( ) ;
668+
669+ fn name ( & self ) -> & str {
670+ "reset nonces"
671+ }
672+
673+ fn watch_keys ( & self ) -> Vec < String > {
674+ vec ! [
675+ self . keys. optimistic_transaction_count_key_name( ) ,
676+ self . keys. last_transaction_count_key_name( ) ,
677+ self . keys. recycled_nonces_zset_name( ) ,
678+ self . keys. manual_reset_key_name( ) ,
679+ ]
680+ }
681+
682+ async fn validation (
683+ & self ,
684+ _conn : & mut ConnectionManager ,
685+ store : & EoaExecutorStore ,
686+ ) -> Result < Self :: ValidationData , TransactionStoreError > {
687+ let now = chrono:: Utc :: now ( ) . timestamp_millis ( ) . max ( 0 ) as u64 ;
688+
689+ // Get current health data to prepare update
690+ let current_health = store. get_eoa_health ( ) . await ?;
691+ let health_update = if let Some ( mut health) = current_health {
692+ health. nonce_resets . push ( now) ;
693+ // Keep only the last 5 nonce reset timestamps
694+ if health. nonce_resets . len ( ) > 5 {
695+ health. nonce_resets . drain ( 0 ..health. nonce_resets . len ( ) - 5 ) ;
696+ }
697+ Some ( serde_json:: to_string ( & health) ?)
698+ } else {
699+ None
700+ } ;
701+
702+ Ok ( health_update)
703+ }
704+
705+ fn operation (
706+ & self ,
707+ pipeline : & mut twmq:: redis:: Pipeline ,
708+ health_update : Self :: ValidationData ,
709+ ) -> Self :: OperationResult {
710+ let optimistic_key = self . keys . optimistic_transaction_count_key_name ( ) ;
711+ let cached_nonce_key = self . keys . last_transaction_count_key_name ( ) ;
712+ let recycled_key = self . keys . recycled_nonces_zset_name ( ) ;
713+ let manual_reset_key = self . keys . manual_reset_key_name ( ) ;
714+
715+ // Update health data only if it exists
716+ if let Some ( ref health_json) = health_update {
717+ let health_key = self . keys . eoa_health_key_name ( ) ;
718+ pipeline. set ( & health_key, health_json) ;
719+ }
720+
721+ // Reset the optimistic nonce
722+ pipeline. set ( & optimistic_key, self . current_chain_tx_count ) ;
723+
724+ // Reset the cached nonce
725+ pipeline. set ( & cached_nonce_key, self . current_chain_tx_count ) ;
726+
727+ // Reset the recycled nonces
728+ pipeline. del ( recycled_key) ;
729+
730+ // Delete the manual reset key
731+ pipeline. del ( & manual_reset_key) ;
732+ }
733+ }
0 commit comments