Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 100 additions & 34 deletions executors/src/eoa/store/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use crate::{
EoaExecutorStore,
events::EoaExecutorEvent,
store::{
BorrowedTransactionData, ConfirmedTransaction, EoaHealth, PendingTransaction,
SubmittedTransactionDehydrated, TransactionAttempt, TransactionStoreError,
BorrowedTransactionData, ConfirmedTransaction, EoaExecutorStoreKeys, EoaHealth,
PendingTransaction, SubmittedTransactionDehydrated, TransactionAttempt,
TransactionStoreError,
borrowed::{BorrowedProcessingReport, ProcessBorrowedTransactions, SubmissionResult},
pending::{
MovePendingToBorrowedWithIncrementedNonces, MovePendingToBorrowedWithRecycledNonces,
Expand Down Expand Up @@ -393,6 +394,7 @@ impl AtomicEoaExecutorStore {
/// Synchronize nonces with the chain
///
/// Part of standard nonce management flow, called in the confirm stage when chain nonce advances, and we need to update our cached nonce
#[tracing::instrument(skip_all, fields(current_chain_tx_count = current_chain_tx_count))]
pub async fn update_cached_transaction_count(
&self,
current_chain_tx_count: u64,
Expand Down Expand Up @@ -502,47 +504,35 @@ impl AtomicEoaExecutorStore {
///
/// This is called when we have too many recycled nonces and detect something wrong
/// We want to start fresh, with the chain nonce as the new optimistic nonce
#[tracing::instrument(skip_all)]
pub async fn reset_nonces(
&self,
current_chain_tx_count: u64,
) -> Result<(), TransactionStoreError> {
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;

let current_health = self.get_eoa_health().await?;

// Prepare health update if health data exists
let health_update = if let Some(mut health) = current_health {
health.nonce_resets.push(now);
Some(serde_json::to_string(&health)?)
} else {
None
let reset_tx = ResetNoncesTransaction {
keys: &self.store.keys,
current_chain_tx_count,
};

self.with_lock_check(|pipeline| {
let optimistic_key = self.optimistic_transaction_count_key_name();
let cached_nonce_key = self.last_transaction_count_key_name();
let recycled_key = self.recycled_nonces_zset_name();
let manual_reset_key = self.manual_reset_key_name();
let reset_result = self.execute_with_watch_and_retry(&reset_tx).await;

// Update health data only if it exists
if let Some(ref health_json) = health_update {
let health_key = self.eoa_health_key_name();
pipeline.set(&health_key, health_json);
match &reset_result {
Ok(()) => {
tracing::info!(
current_chain_tx_count = current_chain_tx_count,
"Reset nonces successfully"
);
}
Err(e) => {
tracing::error!(
current_chain_tx_count = current_chain_tx_count,
error = ?e,
"Failed to reset nonces"
);
}
}

// Reset the optimistic nonce
pipeline.set(&optimistic_key, current_chain_tx_count);

// Reset the cached nonce
pipeline.set(&cached_nonce_key, current_chain_tx_count);

// Reset the recycled nonces
pipeline.del(recycled_key);

// Delete the manual reset key
pipeline.del(&manual_reset_key);
})
.await
reset_result
}

/// Fail a transaction that's in the pending state (remove from pending and fail)
Expand Down Expand Up @@ -654,3 +644,79 @@ impl AtomicEoaExecutorStore {
.await
}
}

/// SafeRedisTransaction implementation for resetting nonces
pub struct ResetNoncesTransaction<'a> {
pub keys: &'a EoaExecutorStoreKeys,
pub current_chain_tx_count: u64,
}

impl SafeRedisTransaction for ResetNoncesTransaction<'_> {
type ValidationData = Option<String>;
type OperationResult = ();

fn name(&self) -> &str {
"reset nonces"
}

fn watch_keys(&self) -> Vec<String> {
vec![
self.keys.optimistic_transaction_count_key_name(),
self.keys.last_transaction_count_key_name(),
self.keys.recycled_nonces_zset_name(),
self.keys.manual_reset_key_name(),
]
}

async fn validation(
&self,
_conn: &mut ConnectionManager,
store: &EoaExecutorStore,
) -> Result<Self::ValidationData, TransactionStoreError> {
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;

// Get current health data to prepare update
let current_health = store.get_eoa_health().await?;
let health_update = if let Some(mut health) = current_health {
health.nonce_resets.push(now);
// Keep only the last 5 nonce reset timestamps
if health.nonce_resets.len() > 5 {
health.nonce_resets.drain(0..health.nonce_resets.len() - 5);
}
Some(serde_json::to_string(&health)?)
} else {
None
};

Ok(health_update)
}

fn operation(
&self,
pipeline: &mut twmq::redis::Pipeline,
health_update: Self::ValidationData,
) -> Self::OperationResult {
let optimistic_key = self.keys.optimistic_transaction_count_key_name();
let cached_nonce_key = self.keys.last_transaction_count_key_name();
let recycled_key = self.keys.recycled_nonces_zset_name();
let manual_reset_key = self.keys.manual_reset_key_name();

// Update health data only if it exists
if let Some(ref health_json) = health_update {
let health_key = self.keys.eoa_health_key_name();
pipeline.set(&health_key, health_json);
}

// Reset the optimistic nonce
pipeline.set(&optimistic_key, self.current_chain_tx_count);

// Reset the cached nonce
pipeline.set(&cached_nonce_key, self.current_chain_tx_count);

// Reset the recycled nonces
pipeline.del(recycled_key);

// Delete the manual reset key
pipeline.del(&manual_reset_key);
}
}
10 changes: 8 additions & 2 deletions executors/src/eoa/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,10 @@ impl EoaExecutorStoreKeys {
"{ns}:eoa_executor:pending_manual_reset:{}:{}",
self.chain_id, self.eoa
),
None => format!("eoa_executor:pending_manual_reset:{}:{}", self.chain_id, self.eoa),
None => format!(
"eoa_executor:pending_manual_reset:{}:{}",
self.chain_id, self.eoa
),
}
}
}
Expand Down Expand Up @@ -416,12 +419,15 @@ impl EoaExecutorStore {
worker_id: worker_id.to_string(),
});
}
let conflict_worker_id = conn.get::<_, String>(&lock_key).await?;

// Lock exists, forcefully take it over
tracing::warn!(
eoa = ?self.eoa,
chain_id = self.chain_id,
worker_id = worker_id,
"Forcefully taking over EOA lock from stalled worker"
conflict_worker_id = ?conflict_worker_id,
"Forcefully taking over EOA lock from stalled worker."
);
// Force set - no expiry, only released by explicit takeover
let _: () = conn.set(&lock_key, worker_id).await?;
Expand Down
6 changes: 5 additions & 1 deletion executors/src/eoa/worker/confirm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct ConfirmedTransactionWithRichReceipt {

impl<C: Chain> EoaExecutorWorker<C> {
// ========== CONFIRM FLOW ==========
#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))]
pub async fn confirm_flow(&self) -> Result<CleanupReport, EoaExecutorWorkerError> {
Comment on lines +30 to 31
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix tracing field: call the accessor and format properly

Same as send flow: use the accessor and % formatter.

-    #[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))]
+    #[tracing::instrument(skip_all, fields(worker_id = %self.store.worker_id()))]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))]
pub async fn confirm_flow(&self) -> Result<CleanupReport, EoaExecutorWorkerError> {
#[tracing::instrument(skip_all, fields(worker_id = %self.store.worker_id()))]
pub async fn confirm_flow(&self) -> Result<CleanupReport, EoaExecutorWorkerError> {
🤖 Prompt for AI Agents
In executors/src/eoa/worker/confirm.rs around lines 30 to 31, the tracing
attribute uses the store field directly instead of calling the accessor and
using the percent formatter; change the attribute to call the accessor and
format with `%` (e.g. replace fields(worker_id = self.store.worker_id) with
fields(worker_id = %self.store.worker_id())) so the worker_id is obtained via
its accessor and displayed correctly in the span.

// Get fresh on-chain transaction count
let current_chain_transaction_count = self
Expand All @@ -53,6 +53,10 @@ impl<C: Chain> EoaExecutorWorker<C> {
let cached_transaction_count = match self.store.get_cached_transaction_count().await {
Err(e) => match e {
TransactionStoreError::NonceSyncRequired { .. } => {
tracing::warn!(
cached_transaction_count = current_chain_transaction_count,
"Nonce sync required, store was uninitialized, updating cached transaction count with current chain transaction count"
);
self.store
.update_cached_transaction_count(current_chain_transaction_count)
.await?;
Expand Down
19 changes: 16 additions & 3 deletions executors/src/eoa/worker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ impl UserCancellable for EoaExecutorWorkerError {
// ========== SIMPLE ERROR CLASSIFICATION ==========
#[derive(Debug)]
pub enum SendErrorClassification {
PossiblySent, // "nonce too low", "already known" etc
DeterministicFailure, // Invalid signature, malformed tx, insufficient funds etc
PossiblySent, // "nonce too low", "already known" etc
DeterministicFailure, // Invalid signature, malformed tx, insufficient funds etc
DeterministicFailureNonRetryable, // Non-retryable deterministic failure
}

#[derive(PartialEq, Eq, Debug)]
Expand Down Expand Up @@ -185,11 +186,14 @@ pub fn classify_send_error(
if error_str.contains("malformed")
|| error_str.contains("gas limit")
|| error_str.contains("intrinsic gas too low")
|| error_str.contains("oversized")
{
return SendErrorClassification::DeterministicFailure;
}

if error_str.contains("oversized") {
return SendErrorClassification::DeterministicFailureNonRetryable;
}

tracing::warn!(
"Unknown send error: {}. PLEASE REPORT FOR ADDING CORRECT CLASSIFICATION [NOTIFY]",
error_str
Expand Down Expand Up @@ -305,6 +309,15 @@ impl SubmissionResult {
transaction: borrowed_transaction.clone().into(),
}
}
SendErrorClassification::DeterministicFailureNonRetryable => SubmissionResult {
result: SubmissionResultType::Fail(
EoaExecutorWorkerError::TransactionSendError {
message: format!("Transaction send failed: {rpc_error}"),
inner_error: rpc_error.to_engine_error(chain),
},
),
transaction: borrowed_transaction.clone().into(),
},
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions executors/src/eoa/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use crate::eoa::authorization_cache::EoaAuthorizationCache;
use crate::eoa::store::{
AtomicEoaExecutorStore, EoaExecutorStore, EoaExecutorStoreKeys, EoaHealth, SubmissionResult,
};
use crate::metrics::{calculate_duration_seconds, calculate_duration_seconds_from_twmq, current_timestamp_ms, record_eoa_job_processing_time};
use crate::metrics::{
calculate_duration_seconds, current_timestamp_ms, record_eoa_job_processing_time,
};
use crate::webhook::WebhookJobHandler;

pub mod confirm;
Expand Down Expand Up @@ -148,20 +150,22 @@ where
})
.map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?;

let worker_id = format!("{}:{}", uuid::Uuid::new_v4(), job.lease_token);

Comment on lines +153 to +154
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Align worker_id with PR objective: remove dependency on twmq lease_token

PR title: “Compute eoa worker ID independent of twmq lease token.” Current code still concatenates job.lease_token. Generate a worker_id that’s not derived from lease tokens. Optionally include eoa/chain for traceability.

Minimal fix:

-        let worker_id = format!("{}:{}", uuid::Uuid::new_v4(), job.lease_token);
+        let worker_id = uuid::Uuid::new_v4().to_string();

Alternative (better observability):

-        let worker_id = format!("{}:{}", uuid::Uuid::new_v4(), job.lease_token);
+        let worker_id = format!("{}:{}:{}", data.chain_id, data.eoa_address, uuid::Uuid::new_v4());

Verification script to ensure no other lease-token–derived worker IDs remain:


🏁 Script executed:

#!/bin/bash
# Find worker_id constructions that include lease_token
rg -n -C2 --type=rust 'worker_id.*lease_token'

Length of output: 443


Remove lease_token from worker_id generation

The code at executors/src/eoa/worker/mod.rs:153 still concatenates job.lease_token into the worker_id, which contradicts the PR’s objective to “compute eoa worker ID independent of twmq lease token.” Please update this to drop any dependency on lease_token.

Suggested fixes:

  • Minimal change:
  • let worker_id = format!("{}:{}", uuid::Uuid::new_v4(), job.lease_token);
  • let worker_id = uuid::Uuid::new_v4().to_string();
    
    
  • Better observability (include chain and EOA address):
  • let worker_id = format!("{}:{}", uuid::Uuid::new_v4(), job.lease_token);
  • let worker_id = format!(
  • "{}:{}:{}",
    
  • data.chain_id,
    
  • data.eoa_address,
    
  • uuid::Uuid::new_v4()
    
  • );
    
    

Verification confirms this is the only occurrence of lease_token in any worker_id construction:

$ rg -n -C2 --type=rust 'worker_id.*lease_token'
executors/src/eoa/worker/mod.rs:153:        let worker_id = format!("{}:{}", uuid::Uuid::new_v4(), job.lease_token);
🤖 Prompt for AI Agents
In executors/src/eoa/worker/mod.rs around lines 153 to 154, the worker_id is
currently built by concatenating a UUID with job.lease_token; remove the
lease_token dependency so the worker ID is computed independent of the twmq
lease token. Fix by changing the construction to either (minimal) use only a
fresh UUID (e.g., UUID alone) or (preferred for observability) format the ID to
include chain and the EOA address (e.g., UUID:<chain>:<eoa_address>) instead of
including job.lease_token; ensure any variables used (chain or eoa address) are
available in scope and update any tests/logs that assert the old format.

// 2. CREATE SCOPED STORE (acquires lock)
let scoped = EoaExecutorStore::new(
self.redis.clone(),
self.namespace.clone(),
data.eoa_address,
data.chain_id,
)
.acquire_eoa_lock_aggressively(&job.lease_token)
.acquire_eoa_lock_aggressively(&worker_id)
.await
.map_err(|e| Into::<EoaExecutorWorkerError>::into(e).handle())?;

let delegated_account = DelegatedAccount::new(data.eoa_address, chain.clone());

// if there's an error checking 7702 delegation here, we'll just assume it's not a minimal account for the pursposes of max in flight
// if there's an error checking 7702 delegation here, we'll just assume it's not a minimal account for the purposes of max in flight
let is_minimal_account = self
.authorization_cache
.is_minimal_account(&delegated_account)
Expand Down Expand Up @@ -192,7 +196,7 @@ where
let job_start_time = current_timestamp_ms();
let result = worker.execute_main_workflow().await?;
if let Err(e) = worker.release_eoa_lock().await {
tracing::error!(error = ?e, "Error releasing EOA lock");
tracing::error!(error = ?e, worker_id = worker_id, "Error releasing EOA lock");
}

// Record EOA job processing metrics
Expand Down
2 changes: 1 addition & 1 deletion executors/src/eoa/worker/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const HEALTH_CHECK_INTERVAL: u64 = 300; // 5 minutes in seconds

impl<C: Chain> EoaExecutorWorker<C> {
// ========== SEND FLOW ==========
#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))]
pub async fn send_flow(&self) -> Result<u32, EoaExecutorWorkerError> {
Comment on lines +19 to 20
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix tracing field: call the accessor and format properly

AtomicEoaExecutorStore exposes worker_id(); using self.store.worker_id will fail if the field isn’t public. Also prefer % to use Display formatting.

Apply:

-    #[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))]
+    #[tracing::instrument(skip_all, fields(worker_id = %self.store.worker_id()))]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))]
pub async fn send_flow(&self) -> Result<u32, EoaExecutorWorkerError> {
#[tracing::instrument(skip_all, fields(worker_id = %self.store.worker_id()))]
pub async fn send_flow(&self) -> Result<u32, EoaExecutorWorkerError> {
🤖 Prompt for AI Agents
In executors/src/eoa/worker/send.rs around lines 19 to 20, the tracing field
references a non-public struct field and doesn't use Display formatting; change
the attribute to call the public accessor and use % for Display formatting so it
reads the worker id via self.store.worker_id() and formats it with % (e.g.,
fields(worker_id = %self.store.worker_id())); keep skip_all and the rest of the
attribute the same.

// 1. Get EOA health (initializes if needed) and check if we should update balance
let mut health = self.get_eoa_health().await?;
Expand Down