Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use builder::config::BuilderConfig;
use builder::service::serve_builder_with_span;
use builder::tasks::block::BlockBuilder;
use builder::tasks::oauth::Authenticator;
use builder::tasks::receipts::ReceiptTask;
use builder::tasks::submit::SubmitTask;
use metrics_exporter_prometheus::PrometheusBuilder;

Expand All @@ -27,13 +28,18 @@ async fn main() -> eyre::Result<()> {
let zenith = config.connect_zenith(host_provider.clone());

let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider);

let receipts = ReceiptTask { host_provider: host_provider.clone() };
let (tx_channel, receipts_jh) = receipts.spawn();

let submit = SubmitTask {
authenticator: authenticator.clone(),
host_provider,
zenith,
client: reqwest::Client::new(),
sequencer_signer,
config: config.clone(),
outbound_tx_channel: tx_channel,
};

let authenticator_jh = authenticator.spawn();
Expand All @@ -47,6 +53,9 @@ async fn main() -> eyre::Result<()> {
_ = submit_jh => {
tracing::info!("submit finished");
},
_ = receipts_jh => {
tracing::info!("receipts finished");
},
_ = build_jh => {
tracing::info!("build finished");
}
Expand Down
1 change: 1 addition & 0 deletions src/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod block;
pub mod bundler;
pub mod oauth;
pub mod receipts;
pub mod submit;
pub mod tx_poller;
74 changes: 74 additions & 0 deletions src/tasks/receipts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use crate::config::Provider;
use alloy::{primitives::TxHash, providers::Provider as _};
use metrics::{counter, histogram};
use std::time::Instant;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::{debug, error};

/// Submits sidecars in ethereum txns to mainnet ethereum
#[derive(Debug, Clone)]
pub struct ReceiptTask {
/// Ethereum Provider
pub host_provider: Provider,
}

impl ReceiptTask {
pub async fn log_tx(&self, pending_tx_hash: TxHash) {
// start timer when tx hash is received
let start: Instant = Instant::now();

// wait for the tx to mine, get its receipt
let receipt = self.host_provider.clone().get_transaction_receipt(pending_tx_hash).await;

match receipt {
Ok(receipt) => {
match receipt {
Some(receipt) => {
// record how long it took to mine the transaction
// potential improvement: use the block timestamp to calculate the time elapsed
histogram!("receipts.tx_mine_time")
.record(start.elapsed().as_millis() as f64);

// log whether the transaction reverted
if receipt.status() {
counter!("receipts.tx_reverted").increment(1);
debug!(tx_hash = %pending_tx_hash, "tx reverted");
} else {
counter!("receipts.tx_succeeded").increment(1);
debug!(tx_hash = %pending_tx_hash, "tx succeeded");
}
}
None => {
counter!("receipts.no_receipt").increment(1);
error!("no receipt found for tx hash");
}
}
}
Err(e) => {
counter!("receipts.rpc_error").increment(1);
error!(error = ?e, "rpc error");
}
}
}

/// Spawns the task which collects metrics on pending transactions
pub fn spawn(self) -> (mpsc::UnboundedSender<TxHash>, JoinHandle<()>) {
let (sender, mut inbound) = mpsc::unbounded_channel();
let handle = tokio::spawn(async move {
loop {
if let Some(pending_tx_hash) = inbound.recv().await {
let this = self.clone();
tokio::spawn(async move {
let that = this.clone();
that.log_tx(pending_tx_hash).await;
});
} else {
tracing::debug!("upstream task gone");
break;
}
}
});

(sender, handle)
}
}
9 changes: 8 additions & 1 deletion src/tasks/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use alloy::{
consensus::{constants::GWEI_TO_WEI, SimpleCoder},
eips::BlockNumberOrTag,
network::{TransactionBuilder, TransactionBuilder4844},
primitives::{FixedBytes, U256},
primitives::{FixedBytes, TxHash, U256},
providers::{Provider as _, SendableTx, WalletProvider},
rpc::types::eth::TransactionRequest,
signers::Signer,
Expand Down Expand Up @@ -59,6 +59,8 @@ pub struct SubmitTask {
pub config: crate::config::BuilderConfig,
/// Authenticator
pub authenticator: Authenticator,
// Channel over which to send pending transactions
pub outbound_tx_channel: mpsc::UnboundedSender<TxHash>,
}

impl SubmitTask {
Expand Down Expand Up @@ -192,6 +194,11 @@ impl SubmitTask {
spawn_provider_send!(&host_provider, &tx);
}

// send the in-progress transaction over the outbound_tx_channel
if self.outbound_tx_channel.send(*tx.tx_hash()).is_err() {
tracing::error!("receipts task gone");
}

// question mark unwraps join error, which would be an internal panic
// then if let checks for rpc error
if let Err(e) = fut.await? {
Expand Down
Loading