Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions executors/src/eoa/store/borrowed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::eoa::{
},
worker::error::EoaExecutorWorkerError,
};
use crate::metrics::{current_timestamp_ms, calculate_duration_seconds, EoaMetrics};
use crate::metrics::{EoaMetrics, calculate_duration_seconds, current_timestamp_ms};
use crate::webhook::{WebhookJobHandler, queue_webhook_envelopes};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -62,7 +62,10 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
}

fn watch_keys(&self) -> Vec<String> {
vec![self.keys.borrowed_transactions_hashmap_name()]
vec![
self.keys.borrowed_transactions_hashmap_name(),
self.keys.recycled_nonces_zset_name(),
]
}

async fn validation(
Expand Down Expand Up @@ -121,15 +124,13 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
SubmissionResultType::Success => {
// Record metrics: transaction queued to sent
let sent_timestamp = current_timestamp_ms();
let queued_to_sent_duration = calculate_duration_seconds(
result.transaction.queued_at,
sent_timestamp
);
let queued_to_sent_duration =
calculate_duration_seconds(result.transaction.queued_at, sent_timestamp);
// Record metrics using the clean EoaMetrics abstraction
self.eoa_metrics.record_transaction_sent(
self.keys.eoa,
self.keys.chain_id,
queued_to_sent_duration
queued_to_sent_duration,
);

// Add to submitted zset
Expand Down Expand Up @@ -189,7 +190,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
// Update transaction data status
let tx_data_key = self.keys.transaction_data_key_name(transaction_id);
pipeline.hset(&tx_data_key, "status", "pending");

// ask for this nonce to be recycled because we did not consume the nonce
pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce);

Expand Down
6 changes: 6 additions & 0 deletions executors/src/eoa/store/submitted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> {
vec![
self.keys.recycled_nonces_zset_name(),
self.keys.last_transaction_count_key_name(),
self.keys.optimistic_transaction_count_key_name(),
self.keys.submitted_transactions_zset_name(),
]
}
Expand Down Expand Up @@ -618,6 +619,11 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> {
"+inf",
);

pipeline.set(
self.keys.optimistic_transaction_count_key_name(),
highest_submitted_nonce + 1,
);

recycled_nonces
}
}
10 changes: 4 additions & 6 deletions executors/src/eoa/worker/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ use engine_core::{chain::Chain, error::AlloyRpcErrorToEngineError};
use crate::eoa::{
store::{BorrowedTransaction, PendingTransaction, SubmissionResult},
worker::{
EoaExecutorWorker,
error::{
EoaExecutorWorkerError, SendContext, is_retryable_preparation_error,
should_update_balance_threshold,
},
},
is_retryable_preparation_error, should_update_balance_threshold, EoaExecutorWorkerError, SendContext
}, EoaExecutorWorker
}, EoaExecutorStore,
};

const HEALTH_CHECK_INTERVAL_MS: u64 = 60 * 5 * 1000; // 5 minutes in milliseconds
Expand All @@ -20,7 +18,7 @@ impl<C: Chain> EoaExecutorWorker<C> {
pub async fn send_flow(&self) -> Result<u32, EoaExecutorWorkerError> {
// 1. Get EOA health (initializes if needed) and check if we should update balance
let mut health = self.get_eoa_health().await?;
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
let now = EoaExecutorStore::now();

// Update balance if it's stale
// TODO: refactor this, very ugly
Expand Down