Skip to content

Commit ac2ed37

Browse files
authored
minimal auto reset (#51)
* Revert to v0.1.6 - undo PR #48, #49, and #50 * keep bun init * add health check * custom webhook http client * auto nonce reset * health check interval MS * fallback directly to nonce reset * remove script * remove
1 parent 3997517 commit ac2ed37

File tree

20 files changed

+372
-583
lines changed

20 files changed

+372
-583
lines changed

Cargo.lock

Lines changed: 0 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

executors/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,3 @@ futures = "0.3.31"
2727
moka = { version = "0.12.10", features = ["future"] }
2828
prometheus = "0.13.4"
2929
lazy_static = "1.5.0"
30-
tokio-retry2 = {version = "0.6.0", features = ["jitter"]}

executors/src/eoa/error_classifier.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ pub enum EoaExecutionError {
3838
message: String,
3939
inner_error: Option<EngineError>,
4040
},
41-
42-
/// Thirdweb support error that requires retry in place
43-
ThirdwebSupportError { message: String },
4441
}
4542

4643
/// Recovery strategy for an EOA execution error
@@ -118,10 +115,6 @@ impl EoaErrorMapper {
118115
EoaExecutionError::AccountError {
119116
message: message.to_string(),
120117
}
121-
} else if msg_lower.contains("we are not able to process your request at this time") {
122-
EoaExecutionError::ThirdwebSupportError {
123-
message: message.to_string(),
124-
}
125118
} else {
126119
// Not an actionable error - let engine error handle it
127120
EoaExecutionError::RpcError {
@@ -211,14 +204,6 @@ impl EoaErrorMapper {
211204
retry_delay: None,
212205
},
213206

214-
EoaExecutionError::ThirdwebSupportError { .. } => RecoveryStrategy {
215-
queue_confirmation: false,
216-
recycle_nonce: false,
217-
needs_resync: false,
218-
retryable: true,
219-
retry_delay: Some(Duration::from_secs(1)), // Short delay for retry in place
220-
},
221-
222207
EoaExecutionError::RpcError { .. } => {
223208
// This should not be used - let engine error handle it
224209
RecoveryStrategy {
@@ -246,7 +231,6 @@ impl EoaExecutionError {
246231
| EoaExecutionError::GasError { message }
247232
| EoaExecutionError::PoolLimitExceeded { message }
248233
| EoaExecutionError::AccountError { message }
249-
| EoaExecutionError::ThirdwebSupportError { message }
250234
| EoaExecutionError::RpcError { message, .. } => message,
251235
}
252236
}

executors/src/eoa/store/atomic.rs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -503,8 +503,7 @@ impl AtomicEoaExecutorStore {
503503
// Add new hash:id to submitted (keeping old ones)
504504
pipeline.zadd(&submitted_key, &submitted_transaction_string, nonce);
505505

506-
// Create hash-to-ID mapping for the new gas bump hash
507-
// Keep ALL hashes until transaction is fully cleaned up for re-org protection
506+
// Still maintain separate hash-to-ID mapping for backward compatibility
508507
pipeline.set(&hash_to_id_key, &submitted_transaction.transaction_id);
509508

510509
// Simply push the new attempt to the attempts list
@@ -555,7 +554,6 @@ impl AtomicEoaExecutorStore {
555554
pending_transaction: &PendingTransaction,
556555
error: EoaExecutorWorkerError,
557556
webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
558-
failed_transaction_expiry_seconds: u64,
559557
) -> Result<(), TransactionStoreError> {
560558
self.with_lock_check(|pipeline| {
561559
let pending_key = self.pending_transactions_zset_name();
@@ -565,23 +563,11 @@ impl AtomicEoaExecutorStore {
565563
// Remove from pending state
566564
pipeline.zrem(&pending_key, &pending_transaction.transaction_id);
567565

568-
// Update transaction data with failure and set expiry
566+
// Update transaction data with failure
569567
pipeline.hset(&tx_data_key, "completed_at", now);
570568
pipeline.hset(&tx_data_key, "failure_reason", error.to_string());
571569
pipeline.hset(&tx_data_key, "status", "failed");
572570

573-
// Set expiry on all related keys for failed transactions
574-
let transaction_keys =
575-
self.get_all_transaction_keys(&pending_transaction.transaction_id);
576-
577-
let ttl: i64 = i64::try_from(failed_transaction_expiry_seconds)
578-
.unwrap_or(i64::MAX)
579-
.max(1);
580-
581-
for key in transaction_keys {
582-
pipeline.expire(&key, ttl);
583-
}
584-
585571
let event = EoaExecutorEvent {
586572
transaction_id: pending_transaction.transaction_id.clone(),
587573
address: pending_transaction.user_request.from,
@@ -627,14 +613,12 @@ impl AtomicEoaExecutorStore {
627613
&self,
628614
results: Vec<SubmissionResult>,
629615
webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
630-
failed_transaction_expiry_seconds: u64,
631616
) -> Result<BorrowedProcessingReport, TransactionStoreError> {
632617
self.execute_with_watch_and_retry(&ProcessBorrowedTransactions {
633618
results,
634619
keys: &self.keys,
635620
webhook_queue,
636621
eoa_metrics: &self.eoa_metrics,
637-
failed_transaction_expiry_seconds,
638622
})
639623
.await
640624
}

executors/src/eoa/store/borrowed.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::eoa::{
1212
},
1313
worker::error::EoaExecutorWorkerError,
1414
};
15-
use crate::metrics::{EoaMetrics, calculate_duration_seconds, current_timestamp_ms};
15+
use crate::metrics::{current_timestamp_ms, calculate_duration_seconds, EoaMetrics};
1616
use crate::webhook::{WebhookJobHandler, queue_webhook_envelopes};
1717

1818
#[derive(Debug, Clone)]
@@ -41,7 +41,6 @@ pub struct ProcessBorrowedTransactions<'a> {
4141
pub keys: &'a EoaExecutorStoreKeys,
4242
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
4343
pub eoa_metrics: &'a EoaMetrics,
44-
pub failed_transaction_expiry_seconds: u64,
4544
}
4645

4746
#[derive(Debug, Default)]
@@ -122,13 +121,15 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
122121
SubmissionResultType::Success => {
123122
// Record metrics: transaction queued to sent
124123
let sent_timestamp = current_timestamp_ms();
125-
let queued_to_sent_duration =
126-
calculate_duration_seconds(result.transaction.queued_at, sent_timestamp);
124+
let queued_to_sent_duration = calculate_duration_seconds(
125+
result.transaction.queued_at,
126+
sent_timestamp
127+
);
127128
// Record metrics using the clean EoaMetrics abstraction
128129
self.eoa_metrics.record_transaction_sent(
129130
self.keys.eoa,
130131
self.keys.chain_id,
131-
queued_to_sent_duration,
132+
queued_to_sent_duration
132133
);
133134

134135
// Add to submitted zset
@@ -188,7 +189,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
188189
// Update transaction data status
189190
let tx_data_key = self.keys.transaction_data_key_name(transaction_id);
190191
pipeline.hset(&tx_data_key, "status", "pending");
191-
192+
192193
// ask for this nonce to be recycled because we did not consume the nonce
193194
pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce);
194195

@@ -218,22 +219,12 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
218219
report.moved_to_pending += 1;
219220
}
220221
SubmissionResultType::Fail(err) => {
221-
// Mark as failed and set expiry
222+
// Mark as failed
222223
let tx_data_key = self.keys.transaction_data_key_name(transaction_id);
223224
pipeline.hset(&tx_data_key, "status", "failed");
224225
pipeline.hset(&tx_data_key, "completed_at", now);
225226
pipeline.hset(&tx_data_key, "failure_reason", err.to_string());
226227

227-
let ttl: i64 = i64::try_from(self.failed_transaction_expiry_seconds)
228-
.unwrap_or(i64::MAX)
229-
.max(1);
230-
231-
// Set expiry on all related keys for failed transactions
232-
let transaction_keys = self.keys.get_all_transaction_keys(transaction_id);
233-
for key in transaction_keys {
234-
pipeline.expire(&key, ttl);
235-
}
236-
237228
// ask for this nonce to be recycled because we did not consume the nonce
238229
pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce);
239230

executors/src/eoa/store/mod.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -186,26 +186,6 @@ impl EoaExecutorStoreKeys {
186186
}
187187
}
188188

189-
/// Get all Redis keys related to a transaction for cleanup
190-
pub fn get_all_transaction_keys(&self, transaction_id: &str) -> Vec<String> {
191-
vec![
192-
self.transaction_data_key_name(transaction_id),
193-
self.transaction_attempts_list_name(transaction_id),
194-
]
195-
}
196-
197-
/// Get all Redis keys related to a transaction including hash mappings for cleanup
198-
pub fn get_all_transaction_keys_with_hashes(&self, transaction_id: &str, transaction_hashes: &[String]) -> Vec<String> {
199-
let mut keys = self.get_all_transaction_keys(transaction_id);
200-
201-
// Add hash-to-id mappings
202-
for hash in transaction_hashes {
203-
keys.push(self.transaction_hash_to_id_key_name(hash));
204-
}
205-
206-
keys
207-
}
208-
209189
/// Name of the hashmap that maps `transaction_id` -> `BorrowedTransactionData`
210190
///
211191
/// This is used for crash recovery. Before submitting a transaction, we atomically move from pending to this borrowed hashmap.
@@ -420,6 +400,13 @@ impl NonceType {
420400
}
421401
}
422402

403+
pub struct EoaExecutorCounts {
404+
pub pending_transactions: u64,
405+
pub submitted_transactions: u64,
406+
pub borrowed_transactions: u64,
407+
pub recycled_nonces: u64,
408+
}
409+
423410
impl EoaExecutorStore {
424411
/// Aggressively acquire EOA lock, forcefully taking over from stalled workers
425412
///
@@ -533,6 +520,24 @@ impl EoaExecutorStore {
533520
}
534521
}
535522

523+
pub async fn get_all_counts(&self) -> Result<EoaExecutorCounts, TransactionStoreError> {
524+
let mut conn = self.redis.clone();
525+
let mut pipeline = twmq::redis::pipe();
526+
527+
pipeline.zcard(self.pending_transactions_zset_name());
528+
pipeline.zcard(self.submitted_transactions_zset_name());
529+
pipeline.hlen(self.borrowed_transactions_hashmap_name());
530+
pipeline.zcard(self.recycled_nonces_zset_name());
531+
532+
let counts: (u64, u64, u64, u64) = pipeline.query_async(&mut conn).await?;
533+
Ok(EoaExecutorCounts {
534+
pending_transactions: counts.0,
535+
submitted_transactions: counts.1,
536+
borrowed_transactions: counts.2,
537+
recycled_nonces: counts.3,
538+
})
539+
}
540+
536541
/// Peek at pending transactions without removing them (safe for planning)
537542
pub async fn peek_pending_transactions(
538543
&self,

executors/src/eoa/store/submitted.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -354,15 +354,6 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
354354
(id, Some(confirmed_tx)) => {
355355
// Clean up confirmed transaction from Redis
356356
self.remove_transaction_from_redis_submitted_zset(pipeline, tx);
357-
358-
// IMMEDIATE CLEANUP: Delete all transaction data since it's confirmed
359-
// Note: Hash mappings will be cleaned up periodically by maintenance script
360-
// TODO: Buggy deletions, fix later
361-
// let keys_to_delete = self.keys.get_all_transaction_keys(id);
362-
// for key in keys_to_delete {
363-
// pipeline.del(&key);
364-
// }
365-
366357
let data_key_name = self.keys.transaction_data_key_name(id);
367358
pipeline.hset(&data_key_name, "status", "confirmed");
368359
pipeline.hset(&data_key_name, "completed_at", now);
@@ -377,6 +368,7 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
377368
let confirmed_timestamp = current_timestamp_ms();
378369
let queued_to_mined_duration =
379370
calculate_duration_seconds(tx.queued_at, confirmed_timestamp);
371+
// Record metrics using the clean EoaMetrics abstraction
380372
self.eoa_metrics.record_transaction_confirmed(
381373
self.keys.eoa,
382374
self.keys.chain_id,

0 commit comments

Comments
 (0)