From cbf24b2d43159da0aa6f225760b58eae871d6a4b Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Wed, 6 Aug 2025 09:18:02 +0530 Subject: [PATCH 1/2] add max in flight limit for delegated account --- Cargo.lock | 1 + core/src/chain.rs | 2 +- executors/Cargo.toml | 1 + executors/src/eoa/authorization_cache.rs | 39 +++++++++++++ executors/src/eoa/mod.rs | 1 + executors/src/eoa/store/mod.rs | 1 + executors/src/eoa/store/submitted.rs | 31 +++++++++-- executors/src/eoa/worker/confirm.rs | 70 +++++++++++------------- executors/src/eoa/worker/mod.rs | 22 +++++++- server/src/execution_router/mod.rs | 25 ++------- server/src/main.rs | 16 ++++-- server/src/queue/manager.rs | 4 +- 12 files changed, 143 insertions(+), 70 deletions(-) create mode 100644 executors/src/eoa/authorization_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 754b8db..4b82510 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2493,6 +2493,7 @@ dependencies = [ "futures", "hex", "hmac", + "moka", "rand 0.9.1", "reqwest", "serde", diff --git a/core/src/chain.rs b/core/src/chain.rs index 0813314..ee00ec2 100644 --- a/core/src/chain.rs +++ b/core/src/chain.rs @@ -260,5 +260,5 @@ impl ThirdwebChainConfig<'_> { } pub trait ChainService { - fn get_chain(&self, chain_id: u64) -> Result; + fn get_chain(&self, chain_id: u64) -> Result; } diff --git a/executors/Cargo.toml b/executors/Cargo.toml index fc95d89..809a23e 100644 --- a/executors/Cargo.toml +++ b/executors/Cargo.toml @@ -24,3 +24,4 @@ uuid = { version = "1.17.0", features = ["v4"] } chrono = "0.4.41" tokio = { version = "1.45.0", features = ["full"] } futures = "0.3.31" +moka = { version = "0.12.10", features = ["future"] } diff --git a/executors/src/eoa/authorization_cache.rs b/executors/src/eoa/authorization_cache.rs new file mode 100644 index 0000000..6a2c5d5 --- /dev/null +++ b/executors/src/eoa/authorization_cache.rs @@ -0,0 +1,39 @@ +use std::ops::Deref; + +use alloy::primitives::Address; +use engine_core::{chain::Chain, error::EngineError}; +use engine_eip7702_core::delegated_account::DelegatedAccount; +use moka::future::Cache; + +#[derive(Hash, Eq, PartialEq)] +pub struct AuthorizationCacheKey { + eoa_address: Address, + chain_id: u64, +} + +#[derive(Clone)] +pub struct EoaAuthorizationCache { + pub inner: moka::future::Cache, +} + +impl EoaAuthorizationCache { + pub fn new(cache: Cache) -> Self { + Self { inner: cache } + } + + pub async fn is_minimal_account( + &self, + delegated_account: &DelegatedAccount, + ) -> Result { + self.inner + .try_get_with( + AuthorizationCacheKey { + eoa_address: delegated_account.eoa_address, + chain_id: delegated_account.chain.chain_id(), + }, + delegated_account.is_minimal_account(), + ) + .await + .map_err(|e| e.deref().clone()) + } +} diff --git a/executors/src/eoa/mod.rs b/executors/src/eoa/mod.rs index d0ab4e6..40c79a0 100644 --- a/executors/src/eoa/mod.rs +++ b/executors/src/eoa/mod.rs @@ -1,3 +1,4 @@ +pub mod authorization_cache; pub mod error_classifier; pub mod events; pub mod store; diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index 5957404..469880d 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -341,6 +341,7 @@ impl From for SubmittedTransactionDehydrated { transaction_hash: data.signed_transaction.hash().to_string(), transaction_id: data.transaction_id.clone(), queued_at: data.queued_at, + submitted_at: EoaExecutorStore::now(), } } } diff --git a/executors/src/eoa/store/submitted.rs b/executors/src/eoa/store/submitted.rs index 92514ab..d3ad93f 100644 --- a/executors/src/eoa/store/submitted.rs +++ b/executors/src/eoa/store/submitted.rs @@ -38,12 +38,13 @@ pub struct SubmittedNoopTransaction { pub transaction_hash: String, } +/// String representation is: {transaction_hash}:{transaction_id}:{queued_at}:{submitted_at} pub type SubmittedTransactionStringWithNonce = (String, u64); impl SubmittedNoopTransaction { pub fn to_redis_string_with_nonce(&self) -> SubmittedTransactionStringWithNonce { ( - format!("{}:{}:0", self.transaction_hash, NO_OP_TRANSACTION_ID), + format!("{}:{}:0:0", self.transaction_hash, NO_OP_TRANSACTION_ID), self.nonce, ) } @@ -91,6 +92,7 @@ pub struct SubmittedTransactionDehydrated { pub nonce: u64, pub transaction_hash: String, pub transaction_id: String, + pub submitted_at: u64, pub queued_at: u64, } @@ -100,11 +102,14 @@ impl SubmittedTransactionDehydrated { .iter() .filter_map(|tx| { let parts: Vec<&str> = tx.0.split(':').collect(); + // this exists for backwards compatibility with old transactions + // remove after sufficient time for old transactions to be cleaned up if parts.len() == 3 { if let Ok(queued_at) = parts[2].parse::() { Some(SubmittedTransactionDehydrated { transaction_hash: parts[0].to_string(), transaction_id: parts[1].to_string(), + submitted_at: 0, nonce: tx.1, queued_at, }) @@ -112,9 +117,27 @@ impl SubmittedTransactionDehydrated { tracing::error!("Invalid queued_at timestamp: {}", tx.0); None } + } else if parts.len() == 4 { + let transaction_hash = parts[0].to_string(); + let transaction_id = parts[1].to_string(); + let queued_at = parts[2].parse::(); + let submitted_at = parts[3].parse::(); + + if let (Ok(queued_at), Ok(submitted_at)) = (queued_at, submitted_at) { + Some(SubmittedTransactionDehydrated { + transaction_hash, + transaction_id, + submitted_at, + nonce: tx.1, + queued_at, + }) + } else { + tracing::error!("Invalid queued_at or submitted_at timestamps: {}", tx.0); + None + } } else { tracing::error!( - "Invalid transaction format, expected 3 parts separated by ':': {}", + "Invalid transaction format, expected 3 or 4 parts separated by ':': {}", tx.0 ); None @@ -137,8 +160,8 @@ impl SubmittedTransactionDehydrated { pub fn to_redis_string_with_nonce(&self) -> SubmittedTransactionStringWithNonce { ( format!( - "{}:{}:{}", - self.transaction_hash, self.transaction_id, self.queued_at + "{}:{}:{}:{}", + self.transaction_hash, self.transaction_id, self.queued_at, self.submitted_at ), self.nonce, ) diff --git a/executors/src/eoa/worker/confirm.rs b/executors/src/eoa/worker/confirm.rs index cc0d57b..f7b6244 100644 --- a/executors/src/eoa/worker/confirm.rs +++ b/executors/src/eoa/worker/confirm.rs @@ -3,13 +3,15 @@ use engine_core::{chain::Chain, error::AlloyRpcErrorToEngineError}; use serde::{Deserialize, Serialize}; use crate::eoa::{ + EoaExecutorStore, store::{ CleanupReport, ConfirmedTransaction, ReplacedTransaction, SubmittedTransactionDehydrated, - TransactionData, TransactionStoreError, + TransactionStoreError, }, worker::{ - error::{should_update_balance_threshold, EoaExecutorWorkerError}, EoaExecutorWorker - }, EoaExecutorStore, + EoaExecutorWorker, + error::{EoaExecutorWorkerError, should_update_balance_threshold}, + }, }; const NONCE_STALL_LIMIT_MS: u64 = 300_000; // 5 minutes in milliseconds - after this time, attempt gas bump @@ -248,37 +250,23 @@ impl EoaExecutorWorker { .get_submitted_transactions_for_nonce(expected_nonce) .await?; - if submitted_transactions.is_empty() { - tracing::debug!( - nonce = expected_nonce, - "No transactions found for stalled nonce, sending noop" - ); - - let noop_tx = self.send_noop_transaction(expected_nonce).await?; - self.store.process_noop_transactions(&[noop_tx]).await?; - return Ok(true); - } - // Load transaction data for all IDs and find the newest one - let mut newest_transaction: Option<(String, TransactionData)> = None; - let mut newest_submitted_at = 0u64; - - for SubmittedTransactionDehydrated { transaction_id, .. } in submitted_transactions { - if let Some(tx_data) = self.store.get_transaction_data(&transaction_id).await? { - // Find the most recent attempt for this transaction - if let Some(latest_attempt) = tx_data.attempts.last() { - let submitted_at = latest_attempt.sent_at; - if submitted_at > newest_submitted_at { - newest_submitted_at = submitted_at; - newest_transaction = Some((transaction_id, tx_data)); - } - } - } - } + let newest_transaction = if submitted_transactions.len() == 1 { + submitted_transactions.first() + } else { + submitted_transactions + .iter() + .max_by_key(|tx| tx.submitted_at) + }; + + let newest_transaction_data = match newest_transaction { + Some(tx) => self.store.get_transaction_data(&tx.transaction_id).await?, + None => None, + }; - if let Some((transaction_id, tx_data)) = newest_transaction { + if let Some(newest_transaction_data) = newest_transaction_data { tracing::info!( - transaction_id = ?transaction_id, + transaction_id = ?newest_transaction_data.transaction_id, nonce = expected_nonce, "Found newest transaction for gas bump" ); @@ -286,7 +274,7 @@ impl EoaExecutorWorker { // Get the latest attempt to extract gas values from // Build typed transaction -> manually bump -> sign let typed_tx = match self - .build_typed_transaction(&tx_data.user_request, expected_nonce) + .build_typed_transaction(&newest_transaction_data.user_request, expected_nonce) .await { Ok(tx) => tx, @@ -310,7 +298,7 @@ impl EoaExecutorWorker { } tracing::warn!( - transaction_id = ?transaction_id, + transaction_id = ?newest_transaction_data.transaction_id, nonce = expected_nonce, error = ?e, "Failed to build typed transaction for gas bump" @@ -320,13 +308,16 @@ impl EoaExecutorWorker { }; let bumped_typed_tx = self.apply_gas_bump_to_typed_transaction(typed_tx, 120); // 20% increase let bumped_tx = match self - .sign_transaction(bumped_typed_tx, &tx_data.user_request.signing_credential) + .sign_transaction( + bumped_typed_tx, + &newest_transaction_data.user_request.signing_credential, + ) .await { Ok(tx) => tx, Err(e) => { tracing::warn!( - transaction_id = ?transaction_id, + transaction_id = ?newest_transaction_data.transaction_id, nonce = expected_nonce, error = ?e, "Failed to sign transaction for gas bump" @@ -341,8 +332,9 @@ impl EoaExecutorWorker { &SubmittedTransactionDehydrated { nonce: expected_nonce, transaction_hash: bumped_tx.hash().to_string(), - transaction_id: transaction_id.to_string(), - queued_at: tx_data.created_at, + transaction_id: newest_transaction_data.transaction_id.clone(), + submitted_at: EoaExecutorStore::now(), + queued_at: newest_transaction_data.created_at, }, bumped_tx.clone(), ) @@ -353,7 +345,7 @@ impl EoaExecutorWorker { match self.chain.provider().send_tx_envelope(tx_envelope).await { Ok(_) => { tracing::info!( - transaction_id = ?transaction_id, + transaction_id = ?newest_transaction_data.transaction_id, nonce = expected_nonce, "Successfully sent gas bumped transaction" ); @@ -361,7 +353,7 @@ impl EoaExecutorWorker { } Err(e) => { tracing::warn!( - transaction_id = ?transaction_id, + transaction_id = ?newest_transaction_data.transaction_id, nonce = expected_nonce, error = ?e, "Failed to send gas bumped transaction" diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index f3ff3b2..d233b70 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -7,6 +7,7 @@ use engine_core::{ error::AlloyRpcErrorToEngineError, signer::EoaSigner, }; +use engine_eip7702_core::delegated_account::DelegatedAccount; use serde::{Deserialize, Serialize}; use std::{sync::Arc, time::Duration}; use twmq::Queue; @@ -18,6 +19,7 @@ use twmq::{ job::{BorrowedJob, JobResult, RequeuePosition, ToJobResult}, }; +use crate::eoa::authorization_cache::EoaAuthorizationCache; use crate::eoa::store::{ AtomicEoaExecutorStore, EoaExecutorStore, EoaExecutorStoreKeys, EoaHealth, SubmissionResult, }; @@ -110,6 +112,7 @@ where { pub chain_service: Arc, pub webhook_queue: Arc>, + pub authorization_cache: EoaAuthorizationCache, pub redis: ConnectionManager, pub namespace: Option, @@ -155,6 +158,19 @@ where .await .map_err(|e| Into::::into(e).handle())?; + let delegated_account = DelegatedAccount::new(data.eoa_address, chain.clone()); + + // if there's an error checking 7702 delegation here, we'll just assume it's not a minimal account for the pursposes of max in flight + let is_minimal_account = self + .authorization_cache + .is_minimal_account(&delegated_account) + .await + .inspect_err(|e| { + tracing::error!(error = ?e, "Error checking 7702 delegation"); + }) + .ok() + .unwrap_or(false); + let worker = EoaExecutorWorker { store: scoped, chain, @@ -162,7 +178,11 @@ where chain_id: data.chain_id, noop_signing_credential: data.noop_signing_credential.clone(), - max_inflight: self.max_inflight, + max_inflight: if is_minimal_account { + 1 + } else { + self.max_inflight + }, max_recycled_nonces: self.max_recycled_nonces, webhook_queue: self.webhook_queue.clone(), signer: self.eoa_signer.clone(), diff --git a/server/src/execution_router/mod.rs b/server/src/execution_router/mod.rs index 90b89ae..9f80ba1 100644 --- a/server/src/execution_router/mod.rs +++ b/server/src/execution_router/mod.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use alloy::primitives::{Address, U256}; +use alloy::primitives::U256; use engine_aa_core::smart_account::{DeterminedSmartAccount, SmartAccount, SmartAccountFromSalt}; use engine_core::{ chain::{ChainService, RpcCredentials}, @@ -21,6 +21,7 @@ use engine_executors::{ }, eoa::{ EoaExecutorJobHandler, EoaExecutorStore, EoaExecutorWorkerJobData, EoaTransactionRequest, + authorization_cache::EoaAuthorizationCache, }, external_bundler::{ confirm::UserOpConfirmationHandler, @@ -51,13 +52,7 @@ pub struct ExecutionRouter { pub transaction_registry: Arc, pub vault_client: Arc, pub chains: Arc, - pub authorization_cache: moka::future::Cache, -} - -#[derive(Hash, Eq, PartialEq)] -pub struct AuthorizationCacheKey { - eoa_address: Address, - chain_id: u64, + pub authorization_cache: EoaAuthorizationCache, } impl ExecutionRouter { @@ -410,17 +405,9 @@ impl ExecutionRouter { let delegated_account = DelegatedAccount::new(eoa_execution_options.from, chain); let is_minimal_account = self .authorization_cache - .try_get_with( - AuthorizationCacheKey { - eoa_address: eoa_execution_options.from, - chain_id: base_execution_options.chain_id, - }, - delegated_account.is_minimal_account(), - ) - .await; - - let is_minimal_account = - is_minimal_account.map_err(|e| EngineError::InternalError { + .is_minimal_account(&delegated_account) + .await + .map_err(|e| EngineError::InternalError { message: format!("Failed to check 7702 delegation: {e:?}"), })?; diff --git a/server/src/main.rs b/server/src/main.rs index 51d2af6..ada3a68 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Duration}; use engine_core::{signer::EoaSigner, userop::UserOpSigner}; +use engine_executors::eoa::authorization_cache::EoaAuthorizationCache; use thirdweb_core::{abi::ThirdwebAbiServiceBuilder, auth::ThirdwebAuth, iaw::IAWClient}; use thirdweb_engine::{ chains::ThirdwebChainService, @@ -53,12 +54,21 @@ async fn main() -> anyhow::Result<()> { let eoa_signer = Arc::new(EoaSigner::new(vault_client.clone(), iaw_client)); let redis_client = twmq::redis::Client::open(config.redis.url.as_str())?; + let authorization_cache = EoaAuthorizationCache::new( + moka::future::Cache::builder() + .max_capacity(1024 * 1024 * 1024) + .time_to_live(Duration::from_secs(60 * 5)) + .time_to_idle(Duration::from_secs(60)) + .build(), + ); + let queue_manager = QueueManager::new( redis_client.clone(), &config.queue, chains.clone(), signer.clone(), eoa_signer.clone(), + authorization_cache.clone(), ) .await?; @@ -77,11 +87,7 @@ async fn main() -> anyhow::Result<()> { let execution_router = ExecutionRouter { namespace: config.queue.execution_namespace.clone(), redis: redis_client.get_connection_manager().await?, - authorization_cache: moka::future::Cache::builder() - .max_capacity(1024 * 1024 * 1024) - .time_to_live(Duration::from_secs(60 * 5)) - .time_to_idle(Duration::from_secs(60)) - .build(), + authorization_cache, webhook_queue: queue_manager.webhook_queue.clone(), external_bundler_send_queue: queue_manager.external_bundler_send_queue.clone(), userop_confirm_queue: queue_manager.userop_confirm_queue.clone(), diff --git a/server/src/queue/manager.rs b/server/src/queue/manager.rs index 84b027d..e945dc4 100644 --- a/server/src/queue/manager.rs +++ b/server/src/queue/manager.rs @@ -5,7 +5,7 @@ use alloy::transports::http::reqwest; use engine_core::error::EngineError; use engine_executors::{ eip7702_executor::{confirm::Eip7702ConfirmationHandler, send::Eip7702SendHandler}, - eoa::EoaExecutorJobHandler, + eoa::{authorization_cache::EoaAuthorizationCache, EoaExecutorJobHandler}, external_bundler::{ confirm::UserOpConfirmationHandler, deployment::{RedisDeploymentCache, RedisDeploymentLock}, @@ -49,6 +49,7 @@ impl QueueManager { chain_service: Arc, userop_signer: Arc, eoa_signer: Arc, + authorization_cache: EoaAuthorizationCache, ) -> Result { // Create transaction registry let transaction_registry = Arc::new(TransactionRegistry::new( @@ -212,6 +213,7 @@ impl QueueManager { webhook_queue: webhook_queue.clone(), namespace: queue_config.execution_namespace.clone(), redis: redis_client.get_connection_manager().await?, + authorization_cache, max_inflight: 100, max_recycled_nonces: 50, }; From 165a9873ddf7cddc23f3bb84c1994326ef402b8d Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Wed, 6 Aug 2025 09:42:21 +0530 Subject: [PATCH 2/2] add manual reset scheduling --- executors/src/eoa/store/atomic.rs | 4 ++ executors/src/eoa/store/mod.rs | 50 ++++++++++++++++--- executors/src/eoa/worker/confirm.rs | 8 +++ .../src/http/routes/admin/eoa_diagnostics.rs | 43 ++++++++++++++++ 4 files changed, 99 insertions(+), 6 deletions(-) diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index 22a4094..baa2c5a 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -506,6 +506,7 @@ impl AtomicEoaExecutorStore { let optimistic_key = self.optimistic_transaction_count_key_name(); let cached_nonce_key = self.last_transaction_count_key_name(); let recycled_key = self.recycled_nonces_zset_name(); + let manual_reset_key = self.manual_reset_key_name(); // Update health data only if it exists if let Some(ref health_json) = health_update { @@ -521,6 +522,9 @@ impl AtomicEoaExecutorStore { // Reset the recycled nonces pipeline.del(recycled_key); + + // Delete the manual reset key + pipeline.del(&manual_reset_key); }) .await } diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index 469880d..a1e0e3a 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -274,6 +274,19 @@ impl EoaExecutorStoreKeys { None => format!("eoa_executor:health:{}:{}", self.chain_id, self.eoa), } } + + /// Manual reset key name. + /// + /// This holds a timestamp if a manual reset is scheduled. + pub fn manual_reset_key_name(&self) -> String { + match &self.namespace { + Some(ns) => format!( + "{ns}:eoa_executor:pending_manual_reset:{}:{}", + self.chain_id, self.eoa + ), + None => format!("eoa_executor:pending_manual_reset:{}:{}", self.chain_id, self.eoa), + } + } } impl EoaExecutorStore { @@ -722,6 +735,15 @@ impl EoaExecutorStore { Ok(()) } + /// Schedule a manual reset for the EOA + pub async fn schedule_manual_reset(&self) -> Result<(), TransactionStoreError> { + let manual_reset_key = self.manual_reset_key_name(); + let mut conn = self.redis.clone(); + conn.set::<_, _, ()>(&manual_reset_key, EoaExecutorStore::now()) + .await?; + Ok(()) + } + /// Get count of submitted transactions awaiting confirmation pub async fn get_submitted_transactions_count(&self) -> Result { let submitted_key = self.submitted_transactions_zset_name(); @@ -753,7 +775,7 @@ impl EoaExecutorStore { } /// Get the current time in milliseconds - /// + /// /// Used as the canonical time representation for this store pub fn now() -> u64 { chrono::Utc::now().timestamp_millis().max(0) as u64 @@ -796,11 +818,13 @@ impl EoaExecutorStore { } /// Get all submitted transactions (raw data) - pub async fn get_all_submitted_transactions(&self) -> Result, TransactionStoreError> { + pub async fn get_all_submitted_transactions( + &self, + ) -> Result, TransactionStoreError> { let submitted_key = self.submitted_transactions_zset_name(); let mut conn = self.redis.clone(); - let submitted_data: Vec = + let submitted_data: Vec = conn.zrange_withscores(&submitted_key, 0, -1).await?; let submitted_txs: Vec = @@ -810,7 +834,10 @@ impl EoaExecutorStore { } /// Get attempts count for a specific transaction - pub async fn get_transaction_attempts_count(&self, transaction_id: &str) -> Result { + pub async fn get_transaction_attempts_count( + &self, + transaction_id: &str, + ) -> Result { let attempts_key = self.transaction_attempts_list_name(transaction_id); let mut conn = self.redis.clone(); @@ -819,12 +846,15 @@ impl EoaExecutorStore { } /// Get all transaction attempts for a specific transaction - pub async fn get_transaction_attempts(&self, transaction_id: &str) -> Result, TransactionStoreError> { + pub async fn get_transaction_attempts( + &self, + transaction_id: &str, + ) -> Result, TransactionStoreError> { let attempts_key = self.transaction_attempts_list_name(transaction_id); let mut conn = self.redis.clone(); let attempts_data: Vec = conn.lrange(&attempts_key, 0, -1).await?; - + let mut attempts = Vec::new(); for attempt_json in attempts_data { let attempt: TransactionAttempt = serde_json::from_str(&attempt_json)?; @@ -833,6 +863,14 @@ impl EoaExecutorStore { Ok(attempts) } + + pub async fn is_manual_reset_scheduled(&self) -> Result { + let manual_reset_key = self.manual_reset_key_name(); + let mut conn = self.redis.clone(); + + let manual_reset: Option = conn.get(&manual_reset_key).await?; + Ok(manual_reset.is_some()) + } } // Additional error types diff --git a/executors/src/eoa/worker/confirm.rs b/executors/src/eoa/worker/confirm.rs index f7b6244..180baa3 100644 --- a/executors/src/eoa/worker/confirm.rs +++ b/executors/src/eoa/worker/confirm.rs @@ -1,6 +1,7 @@ use alloy::{primitives::B256, providers::Provider}; use engine_core::{chain::Chain, error::AlloyRpcErrorToEngineError}; use serde::{Deserialize, Serialize}; +use twmq::redis::AsyncCommands; use crate::eoa::{ EoaExecutorStore, @@ -43,6 +44,13 @@ impl EoaExecutorWorker { } })?; + if self.store.is_manual_reset_scheduled().await? { + tracing::info!("Manual reset scheduled, executing now"); + self.store + .reset_nonces(current_chain_transaction_count) + .await?; + } + let cached_transaction_count = match self.store.get_cached_transaction_count().await { Err(e) => match e { TransactionStoreError::NonceSyncRequired { .. } => { diff --git a/server/src/http/routes/admin/eoa_diagnostics.rs b/server/src/http/routes/admin/eoa_diagnostics.rs index 9f75e2f..82f9ef3 100644 --- a/server/src/http/routes/admin/eoa_diagnostics.rs +++ b/server/src/http/routes/admin/eoa_diagnostics.rs @@ -28,6 +28,7 @@ pub struct EoaStateResponse { pub recycled_nonces_count: u64, pub recycled_nonces: Vec, pub health: Option, + pub manual_reset_scheduled: bool, } #[derive(Debug, Serialize)] @@ -146,6 +147,12 @@ pub async fn get_eoa_state( }) })?; + let manual_reset_scheduled = store.is_manual_reset_scheduled().await.map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to check if manual reset is scheduled: {e}"), + }) + })?; + let response = EoaStateResponse { eoa: eoa_address.to_string(), chain_id, @@ -156,6 +163,7 @@ pub async fn get_eoa_state( borrowed_count, recycled_nonces_count, recycled_nonces, + manual_reset_scheduled, health, }; @@ -369,6 +377,37 @@ pub async fn get_borrowed_transactions( Ok((StatusCode::OK, Json(SuccessResponse::new(transactions)))) } +/// Schedule a manual reset for the EOA + +#[debug_handler] +pub async fn schedule_manual_reset( + _auth: DiagnosticAuthExtractor, + State(state): State, + Path(eoa_chain): Path, +) -> Result { + let (eoa, chain_id) = parse_eoa_chain(&eoa_chain)?; + let eoa_address: Address = eoa.parse().map_err(|_| { + ApiEngineError(engine_core::error::EngineError::ValidationError { + message: "Invalid EOA address format".to_string(), + }) + })?; + + let eoa_queue = &state.queue_manager.eoa_executor_queue; + let redis_conn = eoa_queue.handler.redis.clone(); + + let namespace = eoa_queue.handler.namespace.clone(); + + let store = EoaExecutorStore::new(redis_conn, namespace, eoa_address, chain_id); + + store.schedule_manual_reset().await.map_err(|e| { + ApiEngineError(engine_core::error::EngineError::InternalError { + message: format!("Failed to schedule manual reset: {e}"), + }) + })?; + + Ok((StatusCode::OK, Json(SuccessResponse::new(())))) +} + // ===== HELPER FUNCTIONS ===== /// Parse eoa:chain_id format @@ -415,4 +454,8 @@ pub fn eoa_diagnostics_router() -> Router { "/admin/executors/eoa/{eoa_chain}/borrowed", axum::routing::get(get_borrowed_transactions), ) + .route( + "/admin/executors/eoa/{eoa_chain}/reset", + axum::routing::post(schedule_manual_reset), + ) }