diff --git a/core/src/rpc_clients/bundler.rs b/core/src/rpc_clients/bundler.rs index 3a64cb0..fbf3f3f 100644 --- a/core/src/rpc_clients/bundler.rs +++ b/core/src/rpc_clients/bundler.rs @@ -88,6 +88,12 @@ pub enum TwGetTransactionHashStatus { Success, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TwGetDelegationContractResponse { + pub delegation_contract: Address, +} + impl BundlerClient { /// Create a new bundler client with the given transport pub fn new(transport: impl IntoBoxTransport) -> Self { @@ -170,4 +176,9 @@ impl BundlerClient { Ok(response) } + + pub async fn tw_get_delegation_contract(&self) -> TransportResult { + let response: TwGetDelegationContractResponse = self.inner.request("tw_getDelegationContract", ()).await?; + Ok(response) + } } diff --git a/eip7702-core/src/constants.rs b/eip7702-core/src/constants.rs index 24d1574..06405b3 100644 --- a/eip7702-core/src/constants.rs +++ b/eip7702-core/src/constants.rs @@ -1,8 +1,9 @@ use alloy::primitives::{Address, address}; -/// The minimal account implementation address used for EIP-7702 delegation -pub const MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS: Address = - address!("0xD6999651Fc0964B9c6B444307a0ab20534a66560"); +// The minimal account implementation address used for EIP-7702 delegation +// pub const MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS: Address = +// address!("0xD6999651Fc0964B9c6B444307a0ab20534a66560"); +// NOTE!: do not hardcode. If needed later, use tw_getDelegationContract /// EIP-7702 delegation prefix bytes pub const EIP_7702_DELEGATION_PREFIX: [u8; 3] = [0xef, 0x01, 0x00]; diff --git a/eip7702-core/src/delegated_account.rs b/eip7702-core/src/delegated_account.rs index 8f329fa..d47eb8a 100644 --- a/eip7702-core/src/delegated_account.rs +++ b/eip7702-core/src/delegated_account.rs @@ -10,10 +10,7 @@ use engine_core::{ }; use rand::Rng; -use crate::constants::{ - EIP_7702_DELEGATION_CODE_LENGTH, EIP_7702_DELEGATION_PREFIX, - MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS, -}; +use crate::constants::{EIP_7702_DELEGATION_CODE_LENGTH, EIP_7702_DELEGATION_PREFIX}; /// Represents an EOA address that can have EIP-7702 delegation, associated with a specific chain #[derive(Clone, Debug)] @@ -62,21 +59,26 @@ impl DelegatedAccount { // Extract the target address from bytes 3-23 (20 bytes for address) // EIP-7702 format: 0xef0100 + 20 bytes address - let target_bytes = &code[3..23]; - let target_address = Address::from_slice(target_bytes); - // Compare with the minimal account implementation address - let is_delegated = target_address == MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS; + // NOTE!: skip the actual delegated target address check for now + // extremely unlikely that an EOA being used with engine is delegated to a non-minimal account + // Potential source for fringe edge cases, please verify delegated target address if debugging 7702 execution issues - tracing::debug!( - eoa_address = ?self.eoa_address, - target_address = ?target_address, - minimal_account_address = ?MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS, - has_delegation = is_delegated, - "EIP-7702 delegation check result" - ); + // let target_bytes = &code[3..23]; + // let target_address = Address::from_slice(target_bytes); + + // // Compare with the minimal account implementation address + // let is_delegated = target_address == MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS; + + // tracing::debug!( + // eoa_address = ?self.eoa_address, + // target_address = ?target_address, + // minimal_account_address = ?MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS, + // has_delegation = is_delegated, + // "EIP-7702 delegation check result" + // ); - Ok(is_delegated) + Ok(true) } /// Get the EOA address @@ -103,6 +105,7 @@ impl DelegatedAccount { &self, eoa_signer: &S, credentials: &SigningCredential, + delegation_contract: Address, ) -> Result { let nonce = self.get_nonce().await?; @@ -115,7 +118,7 @@ impl DelegatedAccount { .sign_authorization( signing_options, self.chain.chain_id(), - MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS, + delegation_contract, nonce, credentials, ) diff --git a/eip7702-core/src/transaction.rs b/eip7702-core/src/transaction.rs index 6e2e80a..0798f2d 100644 --- a/eip7702-core/src/transaction.rs +++ b/eip7702-core/src/transaction.rs @@ -193,12 +193,13 @@ impl MinimalAccountTransaction { mut self, signer: &S, credentials: &SigningCredential, + delegation_contract: Address, ) -> Result { if self.account.is_minimal_account().await? { return Ok(self); } - let authorization = self.account.sign_authorization(signer, credentials).await?; + let authorization = self.account.sign_authorization(signer, credentials, delegation_contract).await?; self.authorization = Some(authorization); Ok(self) } diff --git a/eip7702-core/tests/integration_tests.rs b/eip7702-core/tests/integration_tests.rs index 9dba63a..1c59fa5 100644 --- a/eip7702-core/tests/integration_tests.rs +++ b/eip7702-core/tests/integration_tests.rs @@ -27,7 +27,6 @@ use engine_core::{ transaction::InnerTransaction, }; use engine_eip7702_core::{ - constants::MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS, delegated_account::DelegatedAccount, transaction::{CallSpec, LimitType, SessionSpec, WrappedCalls}, }; @@ -290,6 +289,7 @@ struct TestSetup { user_address: Address, signer: MockEoaSigner, + delegation_contract: Option
, } const ANVIL_PORT: u16 = 8545; @@ -358,6 +358,7 @@ impl TestSetup { signer, mock_erc20_contract: contract, anvil_provider: provider, + delegation_contract: None, }) } @@ -376,22 +377,32 @@ impl TestSetup { Ok(()) } - async fn fetch_and_set_bytecode(&self) -> Result<(), Box> { + async fn fetch_and_set_bytecode(&mut self) -> Result<(), Box> { // Fetch bytecode from Base Sepolia let base_sepolia_url = "https://84532.rpc.thirdweb.com".parse()?; let base_sepolia_provider = ProviderBuilder::new().connect_http(base_sepolia_url); + let delegation_contract_response = self + .chain + .bundler_client() + .tw_get_delegation_contract() + .await?; + + // Store the delegation contract address for later use + self.delegation_contract = Some(delegation_contract_response.delegation_contract); + let bytecode = base_sepolia_provider - .get_code_at(MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS) + .get_code_at(delegation_contract_response.delegation_contract) .await?; // Set bytecode on our Anvil chain let _: () = self .anvil_provider - .anvil_set_code(MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS, bytecode) + .anvil_set_code(delegation_contract_response.delegation_contract, bytecode) .await?; println!( - "Set bytecode for minimal account implementation at {MINIMAL_ACCOUNT_IMPLEMENTATION_ADDRESS}" + "Set bytecode for minimal account implementation at {:?}", + delegation_contract_response.delegation_contract ); Ok(()) @@ -645,7 +656,7 @@ impl TestSetup { #[tokio::test] async fn test_eip7702_integration() -> Result<(), Box> { // Set up test environment - let setup = TestSetup::new().await?; + let mut setup = TestSetup::new().await?; // Step 1: Fetch and set bytecode from Base Sepolia setup.fetch_and_set_bytecode().await?; @@ -683,7 +694,11 @@ async fn test_eip7702_integration() -> Result<(), Box> { let developer_tx = developer_account .clone() .owner_transaction(&[mint_transaction]) - .add_authorization_if_needed(&setup.signer, &setup.developer_credentials) + .add_authorization_if_needed( + &setup.signer, + &setup.developer_credentials, + setup.delegation_contract.expect("Delegation contract should be set") + ) .await?; let (wrapped_calls_json, signature) = developer_tx @@ -723,7 +738,11 @@ async fn test_eip7702_integration() -> Result<(), Box> { // Step 8: Delegate user account (session key granter) // User signs authorization but executor broadcasts it (user has no funds) let user_authorization = user_account - .sign_authorization(&setup.signer, &setup.user_credentials) + .sign_authorization( + &setup.signer, + &setup.user_credentials, + setup.delegation_contract.expect("Delegation contract should be set") + ) .await?; // Executor broadcasts the user's delegation transaction diff --git a/executors/src/eip7702_executor/confirm.rs b/executors/src/eip7702_executor/confirm.rs index d8a0f59..fb3eb55 100644 --- a/executors/src/eip7702_executor/confirm.rs +++ b/executors/src/eip7702_executor/confirm.rs @@ -18,7 +18,10 @@ use twmq::{ use crate::eip7702_executor::send::Eip7702Sender; use crate::{ - metrics::{record_transaction_queued_to_confirmed, current_timestamp_ms, calculate_duration_seconds_from_twmq}, + metrics::{ + calculate_duration_seconds_from_twmq, current_timestamp_ms, + record_transaction_queued_to_confirmed, + }, transaction_registry::TransactionRegistry, webhook::{ WebhookJobHandler, @@ -194,10 +197,18 @@ where .bundler_client() .tw_get_transaction_hash(&job_data.bundler_transaction_id) .await - .map_err(|e| Eip7702ConfirmationError::TransactionHashError { - message: e.to_string(), + .map_err(|e| { + tracing::error!( + bundler_transaction_id = job_data.bundler_transaction_id, + sender_details = ?job_data.sender_details, + error = ?e, + "Failed to get transaction hash from bundler" + ); + Eip7702ConfirmationError::TransactionHashError { + message: e.to_string(), + } }) - .map_err_fail()?; + .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; let transaction_hash = match transaction_hash_res { TwGetTransactionHashResponse::Success { transaction_hash } => { @@ -266,12 +277,17 @@ where "Transaction confirmed successfully" ); - // Record metrics if original timestamp is available - if let Some(original_timestamp) = job_data.original_queued_timestamp { - let confirmed_timestamp = current_timestamp_ms(); - let queued_to_confirmed_duration = calculate_duration_seconds_from_twmq(original_timestamp, confirmed_timestamp); - record_transaction_queued_to_confirmed("eip7702", job_data.chain_id, queued_to_confirmed_duration); - } + // Record metrics if original timestamp is available + if let Some(original_timestamp) = job_data.original_queued_timestamp { + let confirmed_timestamp = current_timestamp_ms(); + let queued_to_confirmed_duration = + calculate_duration_seconds_from_twmq(original_timestamp, confirmed_timestamp); + record_transaction_queued_to_confirmed( + "eip7702", + job_data.chain_id, + queued_to_confirmed_duration, + ); + } Ok(Eip7702ConfirmationResult { transaction_id: job_data.transaction_id.clone(), diff --git a/executors/src/eip7702_executor/delegation_cache.rs b/executors/src/eip7702_executor/delegation_cache.rs new file mode 100644 index 0000000..59917f7 --- /dev/null +++ b/executors/src/eip7702_executor/delegation_cache.rs @@ -0,0 +1,68 @@ +use std::{ops::Deref, sync::Arc}; + +use alloy::primitives::Address; +use engine_core::{ + chain::Chain, + error::{AlloyRpcErrorToEngineError, EngineError}, + rpc_clients::TwGetDelegationContractResponse, +}; +use moka::future::Cache; + +/// Cache key for delegation contract - uses chain_id as the key since each chain has one delegation contract +#[derive(Hash, Eq, PartialEq, Clone, Debug)] +pub struct DelegationContractCacheKey { + chain_id: u64, +} + +/// Cache for delegation contract addresses to avoid repeated RPC calls +#[derive(Clone)] +pub struct DelegationContractCache { + pub inner: moka::future::Cache, +} + +impl DelegationContractCache { + /// Create a new delegation contract cache with the provided moka cache + pub fn new(cache: Cache) -> Self { + Self { inner: cache } + } + + /// Get the delegation contract address for a chain, fetching it if not cached + pub async fn get_delegation_contract( + &self, + chain: &C, + ) -> Result { + let cache_key = DelegationContractCacheKey { + chain_id: chain.chain_id(), + }; + + // Use try_get_with for SWR behavior - this will fetch if not cached or expired + let result = self + .inner + .try_get_with(cache_key, async { + tracing::debug!( + chain_id = chain.chain_id(), + "Fetching delegation contract from bundler" + ); + + let TwGetDelegationContractResponse { + delegation_contract, + } = chain + .bundler_client() + .tw_get_delegation_contract() + .await + .map_err(|e| e.to_engine_bundler_error(chain))?; + + tracing::debug!( + chain_id = chain.chain_id(), + delegation_contract = ?delegation_contract, + "Successfully fetched and cached delegation contract" + ); + + Ok(delegation_contract) + }) + .await + .map_err(|e: Arc| e.deref().clone())?; + + Ok(result) + } +} diff --git a/executors/src/eip7702_executor/mod.rs b/executors/src/eip7702_executor/mod.rs index be241c9..6191a3e 100644 --- a/executors/src/eip7702_executor/mod.rs +++ b/executors/src/eip7702_executor/mod.rs @@ -1,2 +1,3 @@ pub mod send; -pub mod confirm; \ No newline at end of file +pub mod confirm; +pub mod delegation_cache; \ No newline at end of file diff --git a/executors/src/eip7702_executor/send.rs b/executors/src/eip7702_executor/send.rs index 64bc22a..2a6e706 100644 --- a/executors/src/eip7702_executor/send.rs +++ b/executors/src/eip7702_executor/send.rs @@ -22,7 +22,11 @@ use twmq::{ }; use crate::{ - metrics::{record_transaction_queued_to_sent, current_timestamp_ms, calculate_duration_seconds_from_twmq}, + eip7702_executor::delegation_cache::DelegationContractCache, + metrics::{ + calculate_duration_seconds_from_twmq, current_timestamp_ms, + record_transaction_queued_to_sent, + }, transaction_registry::TransactionRegistry, webhook::{ WebhookJobHandler, @@ -138,6 +142,7 @@ where pub webhook_queue: Arc>, pub confirm_queue: Arc>>, pub transaction_registry: Arc, + pub delegation_contract_cache: DelegationContractCache, } impl ExecutorStage for Eip7702SendHandler @@ -216,8 +221,23 @@ where None => account.owner_transaction(&job_data.transactions), }; + // Get delegation contract from cache + let delegation_contract = self + .delegation_contract_cache + .get_delegation_contract(transactions.account().chain()) + .await + .map_err(|e| Eip7702SendError::DelegationCheckFailed { inner_error: e }) + .map_err_nack( + Some(Duration::from_secs(2)), + twmq::job::RequeuePosition::Last, + )?; + let transactions = transactions - .add_authorization_if_needed(self.eoa_signer.deref(), &job_data.signing_credential) + .add_authorization_if_needed( + self.eoa_signer.deref(), + &job_data.signing_credential, + delegation_contract, + ) .await .map_err(|e| { let mapped_error = match e { @@ -281,10 +301,11 @@ where tracing::debug!(transaction_id = ?transaction_id, "EIP-7702 transaction sent to bundler"); - // Record metrics: transaction queued to sent - let sent_timestamp = current_timestamp_ms(); - let queued_to_sent_duration = calculate_duration_seconds_from_twmq(job.job.created_at, sent_timestamp); - record_transaction_queued_to_sent("eip7702", job_data.chain_id, queued_to_sent_duration); + // Record metrics: transaction queued to sent + let sent_timestamp = current_timestamp_ms(); + let queued_to_sent_duration = + calculate_duration_seconds_from_twmq(job.job.created_at, sent_timestamp); + record_transaction_queued_to_sent("eip7702", job_data.chain_id, queued_to_sent_duration); let sender_details = match session_key_target_address { Some(target_address) => Eip7702Sender::SessionKey { diff --git a/executors/src/eoa/worker/confirm.rs b/executors/src/eoa/worker/confirm.rs index a05e6ea..2fc0b11 100644 --- a/executors/src/eoa/worker/confirm.rs +++ b/executors/src/eoa/worker/confirm.rs @@ -76,28 +76,41 @@ impl EoaExecutorWorker { // No nonce progress - check if we should attempt gas bumping for stalled nonce let time_since_movement = now.saturating_sub(current_health.last_nonce_movement_at); - // if there are waiting transactions, we can attempt a gas bump - if time_since_movement > NONCE_STALL_LIMIT_MS && submitted_count > 0 { - tracing::info!( - time_since_movement = time_since_movement, - stall_timeout = NONCE_STALL_LIMIT_MS, - current_chain_nonce = current_chain_transaction_count, - cached_transaction_count = cached_transaction_count, - "Nonce has been stalled, attempting gas bump" - ); + // Check if EOA has sufficient funds before attempting gas bump + let is_out_of_funds = current_health.balance <= current_health.balance_threshold; - // Attempt gas bump for the next expected nonce - if let Err(e) = self - .attempt_gas_bump_for_stalled_nonce(current_chain_transaction_count) - .await - { + // if there are waiting transactions and EOA has sufficient funds, we can attempt a gas bump + if time_since_movement > NONCE_STALL_LIMIT_MS && submitted_count > 0 { + if is_out_of_funds { tracing::warn!( - error = ?e, - "Failed to attempt gas bump for stalled nonce" + time_since_movement = time_since_movement, + stall_timeout = NONCE_STALL_LIMIT_MS, + balance = ?current_health.balance, + balance_threshold = ?current_health.balance_threshold, + "Nonce has been stalled, but EOA is out of funds - skipping gas bump" + ); + } else { + tracing::info!( + time_since_movement = time_since_movement, + stall_timeout = NONCE_STALL_LIMIT_MS, + current_chain_nonce = current_chain_transaction_count, + cached_transaction_count = cached_transaction_count, + "Nonce has been stalled, attempting gas bump" ); + + // Attempt gas bump for the next expected nonce + if let Err(e) = self + .attempt_gas_bump_for_stalled_nonce(current_chain_transaction_count) + .await + { + tracing::warn!( + error = ?e, + "Failed to attempt gas bump for stalled nonce" + ); + } } } - + // Check if EOA is stuck and record metric using the clean EoaMetrics abstraction let time_since_movement_seconds = time_since_movement as f64 / 1000.0; if self.store.eoa_metrics.is_stuck(time_since_movement) { @@ -106,14 +119,16 @@ impl EoaExecutorWorker { stuck_threshold = self.store.eoa_metrics.stuck_threshold_seconds, eoa = ?self.eoa, chain_id = self.chain_id, + out_of_funds = is_out_of_funds, "EOA is stuck - nonce hasn't moved for too long" ); - - // Record stuck EOA metric (low cardinality - only problematic EOAs) + + // Record stuck EOA metric (low cardinality - only problematic EOAs) with out_of_funds status self.store.eoa_metrics.record_stuck_eoa( self.eoa, self.chain_id, - time_since_movement_seconds + time_since_movement_seconds, + is_out_of_funds, ); } diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index f019754..70090d3 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -22,6 +22,7 @@ use twmq::{ use crate::eoa::authorization_cache::EoaAuthorizationCache; use crate::eoa::store::{ AtomicEoaExecutorStore, EoaExecutorStore, EoaExecutorStoreKeys, EoaHealth, SubmissionResult, + TransactionStoreError, }; use crate::metrics::{ EoaMetrics, calculate_duration_seconds, current_timestamp_ms, record_eoa_job_processing_time, @@ -241,13 +242,25 @@ where self.soft_release_eoa_lock(&job.job.data).await; } + #[tracing::instrument(name = "eoa_executor_worker_on_fail", skip_all, fields(eoa = ?job.job.data.eoa_address, chain_id = job.job.data.chain_id, job_id = ?job.job.id))] async fn on_fail( &self, job: &BorrowedJob, - _fail_data: FailHookData<'_, Self::ErrorData>, + fail_data: FailHookData<'_, Self::ErrorData>, _tx: &mut TransactionContext<'_>, ) { - self.soft_release_eoa_lock(&job.job.data).await; + if let EoaExecutorWorkerError::StoreError { inner_error, .. } = &fail_data.error { + if let TransactionStoreError::LockLost { .. } = &inner_error { + tracing::error!( + eoa = ?job.job.data.eoa_address, + chain_id = job.job.data.chain_id, + "Encountered lock lost store error, skipping soft release of EOA lock" + ); + return; + } + } else { + self.soft_release_eoa_lock(&job.job.data).await; + } } } @@ -479,7 +492,6 @@ impl EoaExecutorWorker { /// This method ensures the health data is always available for the worker async fn get_eoa_health(&self) -> Result { let store_health = self.store.get_eoa_health().await?; - let now = chrono::Utc::now().timestamp_millis().max(0) as u64; match store_health { Some(health) => Ok(health), @@ -500,6 +512,8 @@ impl EoaExecutorWorker { } })?; + let now = current_timestamp_ms(); + let health = EoaHealth { balance, balance_threshold: U256::ZERO, diff --git a/executors/src/metrics.rs b/executors/src/metrics.rs index f2e9e63..1cf982a 100644 --- a/executors/src/metrics.rs +++ b/executors/src/metrics.rs @@ -67,7 +67,7 @@ impl ExecutorMetrics { "tw_engine_executor_eoa_stuck_duration_seconds", "Duration since last nonce movement for EOAs that are considered stuck" ).buckets(vec![200.0, 300.0, 600.0, 1200.0, 1800.0, 3600.0, 7200.0, 14400.0]), - &["eoa_address", "chain_id"], + &["eoa_address", "chain_id", "out_of_funds"], registry )?; @@ -202,12 +202,12 @@ impl EoaMetrics { } /// Record stuck EOA metric when nonce hasn't moved for too long - pub fn record_stuck_eoa(&self, eoa_address: alloy::primitives::Address, chain_id: u64, time_since_last_movement_seconds: f64) { + pub fn record_stuck_eoa(&self, eoa_address: alloy::primitives::Address, chain_id: u64, time_since_last_movement_seconds: f64, out_of_funds: bool) { // Only record if EOA is actually stuck (exceeds threshold) if time_since_last_movement_seconds > self.stuck_threshold_seconds as f64 { let metrics = get_metrics(); metrics.eoa_stuck_duration - .with_label_values(&[&eoa_address.to_string(), &chain_id.to_string()]) + .with_label_values(&[&eoa_address.to_string(), &chain_id.to_string(), &out_of_funds.to_string()]) .observe(time_since_last_movement_seconds); } } @@ -278,7 +278,7 @@ mod tests { eoa_metrics.record_transaction_sent(test_address, 1, 15.0); // Will record degradation (above threshold) eoa_metrics.record_transaction_confirmed(test_address, 1, 60.0); // Won't record degradation (below threshold) eoa_metrics.record_transaction_confirmed(test_address, 1, 180.0); // Will record degradation (above threshold) - eoa_metrics.record_stuck_eoa(test_address, 1, 900.0); // Will record stuck EOA + eoa_metrics.record_stuck_eoa(test_address, 1, 900.0, false); // Will record stuck EOA // Test that default metrics can be exported let metrics_output = export_default_metrics().expect("Should be able to export default metrics"); diff --git a/server/src/queue/manager.rs b/server/src/queue/manager.rs index c27e002..f75bfd4 100644 --- a/server/src/queue/manager.rs +++ b/server/src/queue/manager.rs @@ -188,6 +188,15 @@ impl QueueManager { .await? .arc(); + // Create delegation contract cache for EIP-7702 + let delegation_contract_cache = engine_executors::eip7702_executor::delegation_cache::DelegationContractCache::new( + moka::future::Cache::builder() + .max_capacity(10000) // Large capacity since it's a single entry per chain + .time_to_live(Duration::from_secs(24 * 60 * 60)) // 24 hours as requested + .time_to_idle(Duration::from_secs(24 * 60 * 60)) // Also 24 hours for TTI + .build(), + ); + // Create EIP-7702 send queue let eip7702_send_handler = Eip7702SendHandler { chain_service: chain_service.clone(), @@ -195,6 +204,7 @@ impl QueueManager { webhook_queue: webhook_queue.clone(), confirm_queue: eip7702_confirm_queue.clone(), transaction_registry: transaction_registry.clone(), + delegation_contract_cache, }; let eip7702_send_queue = Queue::builder() diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index f2c8dda..a52eaed 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -574,6 +574,8 @@ impl Queue { self: &Arc, batch_size: usize, ) -> RedisResult>> { + let pop_id = nanoid::nanoid!(4); + // Lua script that does: // 1. Clean up expired leases (with lease token validation) // 2. Process pending cancellations @@ -582,8 +584,9 @@ impl Queue { let script = redis::Script::new( r#" local now = tonumber(ARGV[1]) - local batch_size = tonumber(ARGV[2]) - local lease_seconds = tonumber(ARGV[3]) + local pop_id = ARGV[2] + local batch_size = tonumber(ARGV[3]) + local lease_seconds = tonumber(ARGV[4]) local queue_id = KEYS[1] local delayed_zset_name = KEYS[2] @@ -707,7 +710,7 @@ impl Queue { local attempts = redis.call('HINCRBY', job_meta_hash_name, 'attempts', 1) -- Generate unique lease token - local lease_token = now .. '_' .. job_id .. '_' .. attempts + local lease_token = now .. '_' .. job_id .. '_' .. attempts .. '_' .. pop_id -- Create separate lease key with TTL local lease_key = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token @@ -748,6 +751,7 @@ impl Queue { .key(self.failed_list_name()) .key(self.success_list_name()) .arg(now) + .arg(pop_id) .arg(batch_size) .arg(self.options.lease_duration.as_secs()) .invoke_async(&mut self.redis.clone())