Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 118 additions & 41 deletions executors/src/eoa/store/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use crate::{
EoaExecutorStore,
events::EoaExecutorEvent,
store::{
BorrowedTransactionData, ConfirmedTransaction, EoaHealth, PendingTransaction,
SubmittedTransactionDehydrated, TransactionAttempt, TransactionStoreError,
BorrowedTransactionData, ConfirmedTransaction, EoaExecutorStoreKeys, EoaHealth,
PendingTransaction, SubmittedTransactionDehydrated, TransactionAttempt,
TransactionStoreError,
borrowed::{BorrowedProcessingReport, ProcessBorrowedTransactions, SubmissionResult},
pending::{
MovePendingToBorrowedWithIncrementedNonces, MovePendingToBorrowedWithRecycledNonces,
Expand Down Expand Up @@ -140,17 +141,28 @@ impl AtomicEoaExecutorStore {
///
/// The transactions must have sequential nonces starting from the current optimistic count.
/// This operation validates nonce ordering and atomically moves all transactions.
#[tracing::instrument(skip_all, fields(transactions = ?transactions))]
pub async fn atomic_move_pending_to_borrowed_with_incremented_nonces(
&self,
transactions: &[BorrowedTransactionData],
) -> Result<usize, TransactionStoreError> {
self.execute_with_watch_and_retry(&MovePendingToBorrowedWithIncrementedNonces {
transactions,
keys: &self.keys,
eoa: self.eoa,
chain_id: self.chain_id,
})
.await
let (moved_count, new_optimistic_tx_count) = self
.execute_with_watch_and_retry(&MovePendingToBorrowedWithIncrementedNonces {
transactions,
keys: &self.keys,
eoa: self.eoa,
chain_id: self.chain_id,
})
.await?;

if let Some(new_optimistic_tx_count) = new_optimistic_tx_count {
tracing::info!(
new_optimistic_tx_count = new_optimistic_tx_count,
"Updated optimistic transaction count to {new_optimistic_tx_count}"
);
}

Ok(moved_count)
}

/// Atomically move multiple pending transactions to borrowed state using recycled nonces
Expand Down Expand Up @@ -393,6 +405,7 @@ impl AtomicEoaExecutorStore {
/// Synchronize nonces with the chain
///
/// Part of standard nonce management flow, called in the confirm stage when chain nonce advances, and we need to update our cached nonce
#[tracing::instrument(skip_all, fields(current_chain_tx_count = current_chain_tx_count))]
pub async fn update_cached_transaction_count(
&self,
current_chain_tx_count: u64,
Expand Down Expand Up @@ -502,47 +515,35 @@ impl AtomicEoaExecutorStore {
///
/// This is called when we have too many recycled nonces and detect something wrong
/// We want to start fresh, with the chain nonce as the new optimistic nonce
#[tracing::instrument(skip_all)]
pub async fn reset_nonces(
&self,
current_chain_tx_count: u64,
) -> Result<(), TransactionStoreError> {
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;

let current_health = self.get_eoa_health().await?;

// Prepare health update if health data exists
let health_update = if let Some(mut health) = current_health {
health.nonce_resets.push(now);
Some(serde_json::to_string(&health)?)
} else {
None
let reset_tx = ResetNoncesTransaction {
keys: &self.store.keys,
current_chain_tx_count,
};

self.with_lock_check(|pipeline| {
let optimistic_key = self.optimistic_transaction_count_key_name();
let cached_nonce_key = self.last_transaction_count_key_name();
let recycled_key = self.recycled_nonces_zset_name();
let manual_reset_key = self.manual_reset_key_name();
let reset_result = self.execute_with_watch_and_retry(&reset_tx).await;

// Update health data only if it exists
if let Some(ref health_json) = health_update {
let health_key = self.eoa_health_key_name();
pipeline.set(&health_key, health_json);
match &reset_result {
Ok(()) => {
tracing::info!(
current_chain_tx_count = current_chain_tx_count,
"Reset nonces successfully"
);
}
Err(e) => {
tracing::error!(
current_chain_tx_count = current_chain_tx_count,
error = ?e,
"Failed to reset nonces"
);
}
}

// Reset the optimistic nonce
pipeline.set(&optimistic_key, current_chain_tx_count);

// Reset the cached nonce
pipeline.set(&cached_nonce_key, current_chain_tx_count);

// Reset the recycled nonces
pipeline.del(recycled_key);

// Delete the manual reset key
pipeline.del(&manual_reset_key);
})
.await
reset_result
}

/// Fail a transaction that's in the pending state (remove from pending and fail)
Expand Down Expand Up @@ -654,3 +655,79 @@ impl AtomicEoaExecutorStore {
.await
}
}

/// SafeRedisTransaction implementation for resetting nonces
pub struct ResetNoncesTransaction<'a> {
pub keys: &'a EoaExecutorStoreKeys,
pub current_chain_tx_count: u64,
}

impl SafeRedisTransaction for ResetNoncesTransaction<'_> {
type ValidationData = Option<String>;
type OperationResult = ();

fn name(&self) -> &str {
"reset nonces"
}

fn watch_keys(&self) -> Vec<String> {
vec![
self.keys.optimistic_transaction_count_key_name(),
self.keys.last_transaction_count_key_name(),
self.keys.recycled_nonces_zset_name(),
self.keys.manual_reset_key_name(),
]
}

async fn validation(
&self,
_conn: &mut ConnectionManager,
store: &EoaExecutorStore,
) -> Result<Self::ValidationData, TransactionStoreError> {
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;

// Get current health data to prepare update
let current_health = store.get_eoa_health().await?;
let health_update = if let Some(mut health) = current_health {
health.nonce_resets.push(now);
// Keep only the last 5 nonce reset timestamps
if health.nonce_resets.len() > 5 {
health.nonce_resets.drain(0..health.nonce_resets.len() - 5);
}
Some(serde_json::to_string(&health)?)
} else {
None
};

Ok(health_update)
}

fn operation(
&self,
pipeline: &mut twmq::redis::Pipeline,
health_update: Self::ValidationData,
) -> Self::OperationResult {
let optimistic_key = self.keys.optimistic_transaction_count_key_name();
let cached_nonce_key = self.keys.last_transaction_count_key_name();
let recycled_key = self.keys.recycled_nonces_zset_name();
let manual_reset_key = self.keys.manual_reset_key_name();

// Update health data only if it exists
if let Some(ref health_json) = health_update {
let health_key = self.keys.eoa_health_key_name();
pipeline.set(&health_key, health_json);
}

// Reset the optimistic nonce
pipeline.set(&optimistic_key, self.current_chain_tx_count);

// Reset the cached nonce
pipeline.set(&cached_nonce_key, self.current_chain_tx_count);

// Reset the recycled nonces
pipeline.del(recycled_key);

// Delete the manual reset key
pipeline.del(&manual_reset_key);
}
}
19 changes: 8 additions & 11 deletions executors/src/eoa/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,10 @@ impl EoaExecutorStoreKeys {
"{ns}:eoa_executor:pending_manual_reset:{}:{}",
self.chain_id, self.eoa
),
None => format!("eoa_executor:pending_manual_reset:{}:{}", self.chain_id, self.eoa),
None => format!(
"eoa_executor:pending_manual_reset:{}:{}",
self.chain_id, self.eoa
),
}
}
}
Expand Down Expand Up @@ -416,12 +419,15 @@ impl EoaExecutorStore {
worker_id: worker_id.to_string(),
});
}
let conflict_worker_id = conn.get::<_, Option<String>>(&lock_key).await?;

// Lock exists, forcefully take it over
tracing::warn!(
eoa = ?self.eoa,
chain_id = self.chain_id,
worker_id = worker_id,
"Forcefully taking over EOA lock from stalled worker"
conflict_worker_id = ?conflict_worker_id,
"Forcefully taking over EOA lock from stalled worker."
);
// Force set - no expiry, only released by explicit takeover
let _: () = conn.set(&lock_key, worker_id).await?;
Expand Down Expand Up @@ -504,15 +510,6 @@ impl EoaExecutorStore {
}
}

/// Peek recycled nonces without removing them
pub async fn peek_recycled_nonces(&self) -> Result<Vec<u64>, TransactionStoreError> {
let recycled_key = self.recycled_nonces_zset_name();
let mut conn = self.redis.clone();

let nonces: Vec<u64> = conn.zrange(&recycled_key, 0, -1).await?;
Ok(nonces)
}

/// Peek at pending transactions without removing them (safe for planning)
pub async fn peek_pending_transactions(
&self,
Expand Down
23 changes: 14 additions & 9 deletions executors/src/eoa/store/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct MovePendingToBorrowedWithIncrementedNonces<'a> {

impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> {
type ValidationData = Vec<String>; // serialized borrowed transactions
type OperationResult = usize; // number of transactions processed
type OperationResult = (usize, Option<u64>); // number of transactions processed, new optimistic nonce

fn name(&self) -> &str {
"pending->borrowed with incremented nonces"
Expand Down Expand Up @@ -59,10 +59,11 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> {
let current_optimistic: Option<u64> = conn
.get(self.keys.optimistic_transaction_count_key_name())
.await?;
let current_optimistic_nonce = current_optimistic.ok_or(TransactionStoreError::NonceSyncRequired {
eoa: self.eoa,
chain_id: self.chain_id,
})?;
let current_optimistic_nonce =
current_optimistic.ok_or(TransactionStoreError::NonceSyncRequired {
eoa: self.eoa,
chain_id: self.chain_id,
})?;

// Extract and validate nonces
let mut nonces: Vec<u64> = self
Expand Down Expand Up @@ -134,13 +135,17 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> {
pipeline.hset(&borrowed_key, &tx.transaction_id, borrowed_json);
}

// Update optimistic tx count to highest nonce + 1
if let Some(last_tx) = self.transactions.last() {
let new_optimistic_tx_count = last_tx.signed_transaction.nonce() + 1;
let new_optimistic_tx_count = self
.transactions
.last()
.map(|tx| tx.signed_transaction.nonce() + 1);

// Update optimistic tx count to highest nonce + 1, if we have a new optimistic nonce
if let Some(new_optimistic_tx_count) = new_optimistic_tx_count {
pipeline.set(&optimistic_key, new_optimistic_tx_count);
}

self.transactions.len()
(self.transactions.len(), new_optimistic_tx_count)
}
Comment on lines +138 to 149
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correctness: deriving new_optimistic_tx_count from last() is order-dependent; use max nonce instead.

self.transactions.last() assumes the input slice is sorted by nonce. Validation guarantees the set of nonces is sequential, but not that self.transactions is ordered. If the slice is unsorted, the optimistic nonce can be set too low, leading to duplicate/non-monotonic optimistic nonces on the next enqueue. Compute from the maximum nonce instead.

Apply this diff:

-        let new_optimistic_tx_count = self
-            .transactions
-            .last()
-            .map(|tx| tx.signed_transaction.nonce() + 1);
+        let new_optimistic_tx_count = self
+            .transactions
+            .iter()
+            .map(|tx| tx.signed_transaction.nonce())
+            .max()
+            .map(|n| n + 1);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let new_optimistic_tx_count = self
.transactions
.last()
.map(|tx| tx.signed_transaction.nonce() + 1);
// Update optimistic tx count to highest nonce + 1, if we have a new optimistic nonce
if let Some(new_optimistic_tx_count) = new_optimistic_tx_count {
pipeline.set(&optimistic_key, new_optimistic_tx_count);
}
self.transactions.len()
(self.transactions.len(), new_optimistic_tx_count)
}
let new_optimistic_tx_count = self
.transactions
.iter()
.map(|tx| tx.signed_transaction.nonce())
.max()
.map(|n| n + 1);
// Update optimistic tx count to highest nonce + 1, if we have a new optimistic nonce
if let Some(new_optimistic_tx_count) = new_optimistic_tx_count {
pipeline.set(&optimistic_key, new_optimistic_tx_count);
}
(self.transactions.len(), new_optimistic_tx_count)
}
🤖 Prompt for AI Agents
In executors/src/eoa/store/pending.rs around lines 138-149, the code uses
self.transactions.last() to compute new_optimistic_tx_count which is
order-dependent and wrong when transactions are unsorted; replace that logic by
iterating all transactions to compute the maximum signed_transaction.nonce(),
add 1 to that max to form new_optimistic_tx_count, set that value on the
pipeline (pipeline.set(&optimistic_key, new_optimistic_tx_count)), and return it
(wrapped the same way as before) so the optimistic nonce always advances to
max_nonce + 1 regardless of input order; ensure you preserve the Option
semantics (None if no transactions) and any type conversions expected by
pipeline.set and the function return.

}

Expand Down
6 changes: 5 additions & 1 deletion executors/src/eoa/worker/confirm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct ConfirmedTransactionWithRichReceipt {

impl<C: Chain> EoaExecutorWorker<C> {
// ========== CONFIRM FLOW ==========
#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))]
pub async fn confirm_flow(&self) -> Result<CleanupReport, EoaExecutorWorkerError> {
Comment on lines +30 to 31
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix tracing field: call the accessor and format properly

Same as send flow: use the accessor and % formatter.

-    #[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))]
+    #[tracing::instrument(skip_all, fields(worker_id = %self.store.worker_id()))]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))]
pub async fn confirm_flow(&self) -> Result<CleanupReport, EoaExecutorWorkerError> {
#[tracing::instrument(skip_all, fields(worker_id = %self.store.worker_id()))]
pub async fn confirm_flow(&self) -> Result<CleanupReport, EoaExecutorWorkerError> {
🤖 Prompt for AI Agents
In executors/src/eoa/worker/confirm.rs around lines 30 to 31, the tracing
attribute uses the store field directly instead of calling the accessor and
using the percent formatter; change the attribute to call the accessor and
format with `%` (e.g. replace fields(worker_id = self.store.worker_id) with
fields(worker_id = %self.store.worker_id())) so the worker_id is obtained via
its accessor and displayed correctly in the span.

// Get fresh on-chain transaction count
let current_chain_transaction_count = self
Expand All @@ -53,6 +53,10 @@ impl<C: Chain> EoaExecutorWorker<C> {
let cached_transaction_count = match self.store.get_cached_transaction_count().await {
Err(e) => match e {
TransactionStoreError::NonceSyncRequired { .. } => {
tracing::warn!(
cached_transaction_count = current_chain_transaction_count,
"Nonce sync required, store was uninitialized, updating cached transaction count with current chain transaction count"
);
self.store
.update_cached_transaction_count(current_chain_transaction_count)
.await?;
Expand Down
19 changes: 16 additions & 3 deletions executors/src/eoa/worker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ impl UserCancellable for EoaExecutorWorkerError {
// ========== SIMPLE ERROR CLASSIFICATION ==========
#[derive(Debug)]
pub enum SendErrorClassification {
PossiblySent, // "nonce too low", "already known" etc
DeterministicFailure, // Invalid signature, malformed tx, insufficient funds etc
PossiblySent, // "nonce too low", "already known" etc
DeterministicFailure, // Invalid signature, malformed tx, insufficient funds etc
DeterministicFailureNonRetryable, // Non-retryable deterministic failure
}

#[derive(PartialEq, Eq, Debug)]
Expand Down Expand Up @@ -185,11 +186,14 @@ pub fn classify_send_error(
if error_str.contains("malformed")
|| error_str.contains("gas limit")
|| error_str.contains("intrinsic gas too low")
|| error_str.contains("oversized")
{
return SendErrorClassification::DeterministicFailure;
}

if error_str.contains("oversized") {
return SendErrorClassification::DeterministicFailureNonRetryable;
}

tracing::warn!(
"Unknown send error: {}. PLEASE REPORT FOR ADDING CORRECT CLASSIFICATION [NOTIFY]",
error_str
Expand Down Expand Up @@ -305,6 +309,15 @@ impl SubmissionResult {
transaction: borrowed_transaction.clone().into(),
}
}
SendErrorClassification::DeterministicFailureNonRetryable => SubmissionResult {
result: SubmissionResultType::Fail(
EoaExecutorWorkerError::TransactionSendError {
message: format!("Transaction send failed: {rpc_error}"),
inner_error: rpc_error.to_engine_error(chain),
},
),
transaction: borrowed_transaction.clone().into(),
},
}
}
}
Expand Down
Loading