Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,5 +260,5 @@ impl ThirdwebChainConfig<'_> {
}

pub trait ChainService {
fn get_chain(&self, chain_id: u64) -> Result<impl Chain, EngineError>;
fn get_chain(&self, chain_id: u64) -> Result<impl Chain + Clone, EngineError>;
}
1 change: 1 addition & 0 deletions executors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ uuid = { version = "1.17.0", features = ["v4"] }
chrono = "0.4.41"
tokio = { version = "1.45.0", features = ["full"] }
futures = "0.3.31"
moka = { version = "0.12.10", features = ["future"] }
39 changes: 39 additions & 0 deletions executors/src/eoa/authorization_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::ops::Deref;

use alloy::primitives::Address;
use engine_core::{chain::Chain, error::EngineError};
use engine_eip7702_core::delegated_account::DelegatedAccount;
use moka::future::Cache;

#[derive(Hash, Eq, PartialEq)]
pub struct AuthorizationCacheKey {
eoa_address: Address,
chain_id: u64,
}

#[derive(Clone)]
pub struct EoaAuthorizationCache {
pub inner: moka::future::Cache<AuthorizationCacheKey, bool>,
}

impl EoaAuthorizationCache {
pub fn new(cache: Cache<AuthorizationCacheKey, bool>) -> Self {
Self { inner: cache }
}

pub async fn is_minimal_account<C: Chain>(
&self,
delegated_account: &DelegatedAccount<C>,
) -> Result<bool, EngineError> {
self.inner
.try_get_with(
AuthorizationCacheKey {
eoa_address: delegated_account.eoa_address,
chain_id: delegated_account.chain.chain_id(),
},
delegated_account.is_minimal_account(),
)
.await
.map_err(|e| e.deref().clone())
}
}
1 change: 1 addition & 0 deletions executors/src/eoa/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod authorization_cache;
pub mod error_classifier;
pub mod events;
pub mod store;
Expand Down
4 changes: 4 additions & 0 deletions executors/src/eoa/store/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ impl AtomicEoaExecutorStore {
let optimistic_key = self.optimistic_transaction_count_key_name();
let cached_nonce_key = self.last_transaction_count_key_name();
let recycled_key = self.recycled_nonces_zset_name();
let manual_reset_key = self.manual_reset_key_name();

// Update health data only if it exists
if let Some(ref health_json) = health_update {
Expand All @@ -521,6 +522,9 @@ impl AtomicEoaExecutorStore {

// Reset the recycled nonces
pipeline.del(recycled_key);

// Delete the manual reset key
pipeline.del(&manual_reset_key);
})
.await
}
Expand Down
51 changes: 45 additions & 6 deletions executors/src/eoa/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,19 @@ impl EoaExecutorStoreKeys {
None => format!("eoa_executor:health:{}:{}", self.chain_id, self.eoa),
}
}

/// Manual reset key name.
///
/// This holds a timestamp if a manual reset is scheduled.
pub fn manual_reset_key_name(&self) -> String {
match &self.namespace {
Some(ns) => format!(
"{ns}:eoa_executor:pending_manual_reset:{}:{}",
self.chain_id, self.eoa
),
None => format!("eoa_executor:pending_manual_reset:{}:{}", self.chain_id, self.eoa),
}
}
}

impl EoaExecutorStore {
Expand Down Expand Up @@ -341,6 +354,7 @@ impl From<BorrowedTransactionData> for SubmittedTransactionDehydrated {
transaction_hash: data.signed_transaction.hash().to_string(),
transaction_id: data.transaction_id.clone(),
queued_at: data.queued_at,
submitted_at: EoaExecutorStore::now(),
}
}
}
Expand Down Expand Up @@ -721,6 +735,15 @@ impl EoaExecutorStore {
Ok(())
}

/// Schedule a manual reset for the EOA
pub async fn schedule_manual_reset(&self) -> Result<(), TransactionStoreError> {
let manual_reset_key = self.manual_reset_key_name();
let mut conn = self.redis.clone();
conn.set::<_, _, ()>(&manual_reset_key, EoaExecutorStore::now())
.await?;
Ok(())
}

/// Get count of submitted transactions awaiting confirmation
pub async fn get_submitted_transactions_count(&self) -> Result<u64, TransactionStoreError> {
let submitted_key = self.submitted_transactions_zset_name();
Expand Down Expand Up @@ -752,7 +775,7 @@ impl EoaExecutorStore {
}

/// Get the current time in milliseconds
///
///
/// Used as the canonical time representation for this store
pub fn now() -> u64 {
chrono::Utc::now().timestamp_millis().max(0) as u64
Expand Down Expand Up @@ -795,11 +818,13 @@ impl EoaExecutorStore {
}

/// Get all submitted transactions (raw data)
pub async fn get_all_submitted_transactions(&self) -> Result<Vec<SubmittedTransactionDehydrated>, TransactionStoreError> {
pub async fn get_all_submitted_transactions(
&self,
) -> Result<Vec<SubmittedTransactionDehydrated>, TransactionStoreError> {
let submitted_key = self.submitted_transactions_zset_name();
let mut conn = self.redis.clone();

let submitted_data: Vec<SubmittedTransactionStringWithNonce> =
let submitted_data: Vec<SubmittedTransactionStringWithNonce> =
conn.zrange_withscores(&submitted_key, 0, -1).await?;

let submitted_txs: Vec<SubmittedTransactionDehydrated> =
Expand All @@ -809,7 +834,10 @@ impl EoaExecutorStore {
}

/// Get attempts count for a specific transaction
pub async fn get_transaction_attempts_count(&self, transaction_id: &str) -> Result<u64, TransactionStoreError> {
pub async fn get_transaction_attempts_count(
&self,
transaction_id: &str,
) -> Result<u64, TransactionStoreError> {
let attempts_key = self.transaction_attempts_list_name(transaction_id);
let mut conn = self.redis.clone();

Expand All @@ -818,12 +846,15 @@ impl EoaExecutorStore {
}

/// Get all transaction attempts for a specific transaction
pub async fn get_transaction_attempts(&self, transaction_id: &str) -> Result<Vec<TransactionAttempt>, TransactionStoreError> {
pub async fn get_transaction_attempts(
&self,
transaction_id: &str,
) -> Result<Vec<TransactionAttempt>, TransactionStoreError> {
let attempts_key = self.transaction_attempts_list_name(transaction_id);
let mut conn = self.redis.clone();

let attempts_data: Vec<String> = conn.lrange(&attempts_key, 0, -1).await?;

let mut attempts = Vec::new();
for attempt_json in attempts_data {
let attempt: TransactionAttempt = serde_json::from_str(&attempt_json)?;
Expand All @@ -832,6 +863,14 @@ impl EoaExecutorStore {

Ok(attempts)
}

pub async fn is_manual_reset_scheduled(&self) -> Result<bool, TransactionStoreError> {
let manual_reset_key = self.manual_reset_key_name();
let mut conn = self.redis.clone();

let manual_reset: Option<u64> = conn.get(&manual_reset_key).await?;
Ok(manual_reset.is_some())
}
}

// Additional error types
Expand Down
31 changes: 27 additions & 4 deletions executors/src/eoa/store/submitted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ pub struct SubmittedNoopTransaction {
pub transaction_hash: String,
}

/// String representation is: {transaction_hash}:{transaction_id}:{queued_at}:{submitted_at}
pub type SubmittedTransactionStringWithNonce = (String, u64);

impl SubmittedNoopTransaction {
pub fn to_redis_string_with_nonce(&self) -> SubmittedTransactionStringWithNonce {
(
format!("{}:{}:0", self.transaction_hash, NO_OP_TRANSACTION_ID),
format!("{}:{}:0:0", self.transaction_hash, NO_OP_TRANSACTION_ID),
self.nonce,
)
}
Expand Down Expand Up @@ -91,6 +92,7 @@ pub struct SubmittedTransactionDehydrated {
pub nonce: u64,
pub transaction_hash: String,
pub transaction_id: String,
pub submitted_at: u64,
pub queued_at: u64,
}

Expand All @@ -100,21 +102,42 @@ impl SubmittedTransactionDehydrated {
.iter()
.filter_map(|tx| {
let parts: Vec<&str> = tx.0.split(':').collect();
// this exists for backwards compatibility with old transactions
// remove after sufficient time for old transactions to be cleaned up
if parts.len() == 3 {
if let Ok(queued_at) = parts[2].parse::<u64>() {
Some(SubmittedTransactionDehydrated {
transaction_hash: parts[0].to_string(),
transaction_id: parts[1].to_string(),
submitted_at: 0,
nonce: tx.1,
queued_at,
})
} else {
tracing::error!("Invalid queued_at timestamp: {}", tx.0);
None
}
} else if parts.len() == 4 {
let transaction_hash = parts[0].to_string();
let transaction_id = parts[1].to_string();
let queued_at = parts[2].parse::<u64>();
let submitted_at = parts[3].parse::<u64>();

if let (Ok(queued_at), Ok(submitted_at)) = (queued_at, submitted_at) {
Some(SubmittedTransactionDehydrated {
transaction_hash,
transaction_id,
submitted_at,
nonce: tx.1,
queued_at,
})
} else {
tracing::error!("Invalid queued_at or submitted_at timestamps: {}", tx.0);
None
}
} else {
tracing::error!(
"Invalid transaction format, expected 3 parts separated by ':': {}",
"Invalid transaction format, expected 3 or 4 parts separated by ':': {}",
tx.0
);
None
Expand All @@ -137,8 +160,8 @@ impl SubmittedTransactionDehydrated {
pub fn to_redis_string_with_nonce(&self) -> SubmittedTransactionStringWithNonce {
(
format!(
"{}:{}:{}",
self.transaction_hash, self.transaction_id, self.queued_at
"{}:{}:{}:{}",
self.transaction_hash, self.transaction_id, self.queued_at, self.submitted_at
),
self.nonce,
)
Expand Down
Loading