Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn main() -> eyre::Result<()> {
let sim = Arc::new(Simulator::new(&config, ru_provider.clone(), slot_calculator));

let (basefee_jh, sim_cache_jh) =
sim.clone().spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone());
sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone());

let build_jh = sim.clone().spawn_simulator_task(constants, sim_items.clone(), submit_channel);

Expand Down
72 changes: 32 additions & 40 deletions src/tasks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Simulator {
/// A `JoinHandle` for the basefee updater and a `JoinHandle` for the
/// cache handler.
pub fn spawn_cache_tasks(
self: Arc<Self>,
&self,
tx_receiver: mpsc::UnboundedReceiver<TxEnvelope>,
bundle_receiver: mpsc::UnboundedReceiver<Bundle>,
cache: SimCache,
Expand All @@ -139,9 +139,10 @@ impl Simulator {

let basefee_price = Arc::new(AtomicU64::new(0_u64));
let basefee_reader = Arc::clone(&basefee_price);
let fut = self.basefee_updater_fut(basefee_price);

// Update the basefee on a per-block cadence
let basefee_jh = tokio::spawn(async move { self.basefee_updater(basefee_price).await });
let basefee_jh = tokio::spawn(fut);

// Update the sim cache whenever a transaction or bundle is received with respect to the basefee
let cache_jh = tokio::spawn(async move {
Expand All @@ -162,45 +163,36 @@ impl Simulator {
/// # Arguments
///
/// - `price`: A shared `Arc<AtomicU64>` used to store the updated basefee value.
async fn basefee_updater(self: Arc<Self>, price: Arc<AtomicU64>) {
debug!("starting basefee updater");
loop {
// calculate start of next slot plus a small buffer
let time_remaining = self.slot_calculator.slot_duration()
- self.slot_calculator.current_timepoint_within_slot()
+ 1;
debug!(time_remaining = ?time_remaining, "basefee updater sleeping until next slot");

// wait until that point in time
sleep(Duration::from_secs(time_remaining)).await;

// update the basefee with that price
self.check_basefee(&price).await;
}
}

/// Queries the latest block from the rollup provider and updates the shared
/// basefee value if a block is found.
///
/// This function retrieves the latest block using the provider, extracts the
/// `base_fee_per_gas` field from the block header (defaulting to zero if missing),
/// and updates the shared `AtomicU64` price tracker. If no block is available,
/// it logs a message without updating the price.
///
/// # Arguments
///
/// - `price`: A shared `Arc<AtomicU64>` used to store the updated basefee.
async fn check_basefee(&self, price: &Arc<AtomicU64>) {
let resp = self.ru_provider.get_block_by_number(Latest).await.inspect_err(|e| {
error!(error = %e, "RPC error during basefee update");
});
fn basefee_updater_fut(&self, price: Arc<AtomicU64>) -> impl Future<Output = ()> + use<>{
let slot_calculator = self.slot_calculator.clone();
let ru_provider = self.ru_provider.clone();

async move {
debug!("starting basefee updater");
loop {
// calculate start of next slot plus a small buffer
let time_remaining = slot_calculator.slot_duration()
- slot_calculator.current_timepoint_within_slot()
+ 1;
debug!(time_remaining = ?time_remaining, "basefee updater sleeping until next slot");

// wait until that point in time
sleep(Duration::from_secs(time_remaining)).await;

// update the basefee with that price
let resp = ru_provider.get_block_by_number(Latest).await.inspect_err(|e| {
error!(error = %e, "RPC error during basefee update");
});

if let Ok(Some(block)) = resp {
let basefee = block.header.base_fee_per_gas.unwrap_or(0);
price.store(basefee, Ordering::Relaxed);
debug!(basefee = basefee, "basefee updated");
} else {
warn!("get basefee failed - an error likely occurred");
}
}

if let Ok(Some(block)) = resp {
let basefee = block.header.base_fee_per_gas.unwrap_or(0);
price.store(basefee, Ordering::Relaxed);
debug!(basefee = basefee, "basefee updated");
} else {
warn!("get basefee failed - an error likely occurred");
}
}

Expand Down
Loading