From c0bfc80ada7548f55447ab54efde383af5c344cb Mon Sep 17 00:00:00 2001 From: itamar Date: Mon, 4 Nov 2024 19:14:52 -0500 Subject: [PATCH] transaction submission logic --- .../src/auction/allocation_rule.rs | 3 + .../astria-auctioneer/src/auction/manager.rs | 33 ++- crates/astria-auctioneer/src/auction/mod.rs | 209 ++++++++++++------ .../astria-auctioneer/src/auctioneer/inner.rs | 1 - ...mitment_stream.rs => commitment_stream.rs} | 4 + .../src/block/executed_stream.rs | 16 +- crates/astria-auctioneer/src/block/mod.rs | 48 ++-- .../src/block/optimistic_stream.rs | 27 ++- crates/astria-auctioneer/src/bundle/client.rs | 2 + crates/astria-auctioneer/src/bundle/mod.rs | 14 ++ .../src/optimistic_executor/mod.rs | 103 +++++---- 11 files changed, 307 insertions(+), 153 deletions(-) rename crates/astria-auctioneer/src/block/{block_commitment_stream.rs => commitment_stream.rs} (89%) diff --git a/crates/astria-auctioneer/src/auction/allocation_rule.rs b/crates/astria-auctioneer/src/auction/allocation_rule.rs index 1b617c60f..77877a9ef 100644 --- a/crates/astria-auctioneer/src/auction/allocation_rule.rs +++ b/crates/astria-auctioneer/src/auction/allocation_rule.rs @@ -11,6 +11,9 @@ impl FirstPrice { } } + /// Submit a bundle with a bid. + /// + /// Returns `true` if the bid is accepted as the highest bid. pub(crate) fn bid(&mut self, bundle: Bundle) -> bool { if bundle.bid() > self.highest_bid.as_ref().map_or(0, |b| b.bid()) { self.highest_bid = Some(bundle); diff --git a/crates/astria-auctioneer/src/auction/manager.rs b/crates/astria-auctioneer/src/auction/manager.rs index 4e7e12851..18aa48c7e 100644 --- a/crates/astria-auctioneer/src/auction/manager.rs +++ b/crates/astria-auctioneer/src/auction/manager.rs @@ -122,7 +122,7 @@ impl Manager { pub(crate) fn new_auction(&mut self, auction_id: Id) { let (handle, auction) = super::Builder { metrics: self.metrics, - shutdown_token: self.shutdown_token.clone(), + shutdown_token: self.shutdown_token.child_token(), sequencer_grpc_client: self.sequencer_grpc_client.clone(), sequencer_abci_client: self.sequencer_abci_client.clone(), latency_margin: self.latency_margin, @@ -141,29 +141,40 @@ impl Manager { pub(crate) fn abort_auction(&mut self, auction_id: Id) -> eyre::Result<()> { // TODO: this should return an option in case the auction returned before being aborted - self.auction_handles + let handle = self + .auction_handles .get_mut(&auction_id) - .ok_or_eyre("unable to get handle for the given auction")? - .abort() - .wrap_err("failed to abort auction") + .ok_or_eyre("unable to get handle for the given auction")?; + + handle.abort().expect("should only abort once per auction"); + Ok(()) } #[instrument(skip(self))] pub(crate) fn start_timer(&mut self, auction_id: Id) -> eyre::Result<()> { - self.auction_handles + let handle = self + .auction_handles .get_mut(&auction_id) - .ok_or_eyre("unable to get handle for the given auction")? + .ok_or_eyre("unable to get handle for the given auction")?; + + handle .start_timer() - .wrap_err("failed to start timer") + .expect("should only start timer once per auction"); + + Ok(()) } #[instrument(skip(self))] pub(crate) fn start_processing_bids(&mut self, auction_id: Id) -> eyre::Result<()> { - self.auction_handles + let handle = self + .auction_handles .get_mut(&auction_id) - .ok_or_eyre("unable to get handle for the given auction")? + .ok_or_eyre("unable to get handle for the given auction")?; + + handle .start_processing_bids() - .wrap_err("failed to start processing bids") + .expect("should only start processing bids once per auction"); + Ok(()) } pub(crate) fn try_send_bundle(&mut self, auction_id: Id, bundle: Bundle) -> eyre::Result<()> { diff --git a/crates/astria-auctioneer/src/auction/mod.rs b/crates/astria-auctioneer/src/auction/mod.rs index af89c3a51..6238e1a17 100644 --- a/crates/astria-auctioneer/src/auction/mod.rs +++ b/crates/astria-auctioneer/src/auction/mod.rs @@ -17,10 +17,15 @@ use astria_eyre::eyre::{ self, eyre, Context, + ContextCompat, OptionExt as _, }; pub(crate) use builder::Builder; -use sequencer_client::Address; +use sequencer_client::{ + tendermint_rpc::endpoint::broadcast::tx_sync, + Address, + SequencerClientExt, +}; use telemetry::display::base64; use tokio::{ select, @@ -31,13 +36,13 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{ + debug, + error, info, instrument, warn, Instrument, - Span, }; -use tryhard::backoff_strategies::ExponentialBackoff; use crate::{ bundle::Bundle, @@ -66,7 +71,14 @@ pub(crate) use manager::Manager; mod allocation_rule; +enum Command { + StartProcessingBids, + StartTimer, + Abort, +} + pub(crate) struct Handle { + commands_tx: mpsc::Sender, start_processing_bids_tx: Option>, start_timer_tx: Option>, abort_tx: Option>, @@ -78,7 +90,7 @@ impl Handle { let _ = self .abort_tx .take() - .expect("should only send reorg signal to a given auction once"); + .ok_or_eyre("should only send reorg signal to a given auction once")?; Ok(()) } @@ -87,7 +99,7 @@ impl Handle { let _ = self .start_processing_bids_tx .take() - .expect("should only send executed signal to a given auction once") + .ok_or_eyre("should only send executed signal to a given auction once")? .send(()); Ok(()) } @@ -96,7 +108,7 @@ impl Handle { let _ = self .start_timer_tx .take() - .expect("should only send block commitment signal to a given auction once") + .ok_or_eyre("should only send block commitment signal to a given auction once")? .send(()); Ok(()) @@ -112,7 +124,7 @@ impl Handle { } // TODO: should this be the same object as the auction? -struct Auction { +pub(crate) struct Auction { #[allow(dead_code)] metrics: &'static Metrics, shutdown_token: CancellationToken, @@ -176,7 +188,7 @@ impl Auction { signal = &mut self.start_processing_bids_rx, if !auction_is_open => { if let Err(e) = signal { - break Err(eyre!("exec signal channel closed")).wrap_err(e); + break Err(e).wrap_err("exec signal channel closed"); } // set auction to open so it starts collecting bids auction_is_open = true; @@ -184,100 +196,97 @@ impl Auction { signal = &mut self.start_timer_rx, if auction_is_open => { if let Err(e) = signal { - break Err(eyre!("commit signal channel closed")).wrap_err(e); + break Err(e).wrap_err("commit signal channel closed"); } // set the timer latency_margin_timer = Some(tokio::time::sleep(self.latency_margin)); - // TODO: also want to fetch the pending nonce here (we wait for commit because we want the pending nonce from after the commit) - nonce_fetch = Some(tokio::task::spawn(async { - // TODO: fetch the pending nonce using the sequencer client with tryhard - Ok(0) + let client = self.sequencer_grpc_client.clone(); + let address = self.sequencer_key.address().clone(); + + // we wait for commit because we want the pending nonce from after the commit + // TODO: fix lifetime issue with passing metrics here? + nonce_fetch = Some(tokio::task::spawn(async move { + get_pending_nonce(client, address).await })); } Some(bundle) = self.new_bundles_rx.recv(), if auction_is_open => { if allocation_rule.bid(bundle.clone()) { - info!(auction.id = %base64(self.auction_id), bundle.bid = %bundle.bid(), "new highest bid") + info!( + auction.id = %base64(self.auction_id), + bundle.bid = %bundle.bid(), + "received new highest bid" + ); + } else { + debug!( + auction.id = %base64(self.auction_id), + bundle.bid = %bundle.bid(), + "received bid lower than current highest bid, discarding" + ); } } } }; - // await the nonce fetch result + // TODO: separate the rest of this to a different object, e.g. AuctionResult? // TODO: flatten this or get rid of the option somehow? + // await the nonce fetch result let nonce = nonce_fetch - .expect("should have received commit to exit the bid loop") + .expect( + "should have received commit and fetched pending nonce before exiting the auction \ + loop", + ) .await - .wrap_err("task failed")? + .wrap_err("get_pending_nonce task failed")? .wrap_err("failed to fetch nonce")?; - // handle auction result + // serialize, sign and submit to the sequencer let transaction_body = auction_result - .wrap_err("")? + .wrap_err("auction failed unexpectedly")? .ok_or_eyre("auction ended with no winning bid")? - .into_transaction_body(nonce, self.rollup_id, self.fee_asset_denomination.clone()); + .into_transaction_body( + nonce, + self.rollup_id, + self.fee_asset_denomination.clone(), + self.sequencer_chain_id, + ); let transaction = transaction_body.sign(self.sequencer_key.signing_key()); let submission_result = select! { biased; - // TODO: should this be Ok(())? - () = self.shutdown_token.cancelled() => Err(eyre!("received shutdown signal")), - - // submit the transaction to the sequencer - result = self.submit_transaction(transaction) => { - // TODO: handle submission failure better? - result + // TODO: should this be Ok(())? or Ok("received shutdown signal")? + () = self.shutdown_token.cancelled() => Err(eyre!("received shutdown signal during auction result submission")), + + result = submit_transaction(self.sequencer_abci_client.clone(), transaction, self.metrics) => { + // TODO: how to handle submission failure better? + match result { + Ok(resp) => { + // TODO: handle failed submission instead of just logging the result + info!(auction.id = %base64(self.auction_id), auction.result = %resp.log, "auction result submitted to sequencer"); + Ok(()) + }, + Err(e) => { + error!(auction.id = %base64(self.auction_id), err = %e, "failed to submit auction result to sequencer"); + Err(e).wrap_err("failed to submit auction result to sequencer") + }, + } } }; - submission_result } - - #[instrument(skip_all, fields(auction.id = %base64(self.auction_id), %address, err))] - async fn get_pending_nonce(&self, address: Address) -> eyre::Result { - let span = tracing::Span::current(); - let retry_cfg = make_retry_cfg("get pending nonce".into(), span); - let client = self.sequencer_grpc_client.clone(); - - let nonce = tryhard::retry_fn(|| { - let mut client = client.clone(); - let address = address.clone(); - - async move { - client - .get_pending_nonce(GetPendingNonceRequest { - address: Some(address.into_raw()), - }) - .await - } - }) - .with_config(retry_cfg) - .in_current_span() - .await - .wrap_err("failed to get pending nonce")? - .into_inner() - .inner; - - Ok(nonce) - } - - async fn submit_transaction(&self, _transaction: Transaction) -> eyre::Result<()> { - unimplemented!() - } } -fn make_retry_cfg( - msg: String, - span: Span, -) -> tryhard::RetryFutureConfig< - ExponentialBackoff, - impl Fn(u32, Option, &tonic::Status) -> futures::future::Ready<()>, -> { - tryhard::RetryFutureConfig::new(1024) +#[instrument(skip_all, fields(%address, err))] +async fn get_pending_nonce( + client: SequencerServiceClient, + address: Address, +) -> eyre::Result { + let span = tracing::Span::current(); + let retry_cfg = tryhard::RetryFutureConfig::new(1024) .exponential_backoff(Duration::from_millis(100)) .max_delay(Duration::from_secs(2)) .on_retry( @@ -290,9 +299,69 @@ fn make_retry_cfg( attempt, wait_duration, error = error as &dyn std::error::Error, - "attempt to {msg} failed; retrying after backoff", + "attempt to get pending nonce failed; retrying after backoff", + ); + futures::future::ready(()) + }, + ); + + let nonce = tryhard::retry_fn(|| { + let mut client = client.clone(); + let address = address.clone(); + + async move { + client + .get_pending_nonce(GetPendingNonceRequest { + address: Some(address.into_raw()), + }) + .await + } + }) + .with_config(retry_cfg) + .in_current_span() + .await + .wrap_err("failed to get pending nonce")? + .into_inner() + .inner; + + Ok(nonce) +} + +async fn submit_transaction( + client: sequencer_client::HttpClient, + transaction: Transaction, + _metrics: &'static Metrics, +) -> eyre::Result { + let span = tracing::Span::current(); + let retry_cfg = tryhard::RetryFutureConfig::new(1024) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)) + .on_retry( + move |attempt: u32, + next_delay: Option, + error: &sequencer_client::extension_trait::Error| { + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + parent: &span, + attempt, + wait_duration, + error = error as &dyn std::error::Error, + "attempt to submit transaction failed; retrying after backoff", ); futures::future::ready(()) }, - ) + ); + + tryhard::retry_fn(|| { + let client = client.clone(); + let transaction = transaction.clone(); + + async move { client.submit_transaction_sync(transaction).await } + }) + .with_config(retry_cfg) + .in_current_span() + .await + .wrap_err("failed to submit transaction") } diff --git a/crates/astria-auctioneer/src/auctioneer/inner.rs b/crates/astria-auctioneer/src/auctioneer/inner.rs index 577a238c8..f34d44bf4 100644 --- a/crates/astria-auctioneer/src/auctioneer/inner.rs +++ b/crates/astria-auctioneer/src/auctioneer/inner.rs @@ -36,7 +36,6 @@ pub(super) struct Auctioneer { } impl Auctioneer { - const AUCTION_DRIVER: &'static str = "auction_driver"; const OPTIMISTIC_EXECUTOR: &'static str = "optimistic_executor"; const _BUNDLE_COLLECTOR: &'static str = "bundle_collector"; diff --git a/crates/astria-auctioneer/src/block/block_commitment_stream.rs b/crates/astria-auctioneer/src/block/commitment_stream.rs similarity index 89% rename from crates/astria-auctioneer/src/block/block_commitment_stream.rs rename to crates/astria-auctioneer/src/block/commitment_stream.rs index ccdc0d3d0..fd68bfe5c 100644 --- a/crates/astria-auctioneer/src/block/block_commitment_stream.rs +++ b/crates/astria-auctioneer/src/block/commitment_stream.rs @@ -10,6 +10,8 @@ use futures::{ Stream, StreamExt as _, }; +use telemetry::display::base64; +use tracing::debug; use super::Commitment; use crate::optimistic_block_client::OptimisticBlockClient; @@ -48,6 +50,8 @@ impl Stream for BlockCommitmentStream { let commitment = Commitment::try_from_raw(raw).wrap_err("failed to parse raw to BlockCommitment")?; + debug!(block_commitment.sequencer_block_hash = %base64(&commitment.sequencer_block_hash()), "received block commitment"); + std::task::Poll::Ready(Some(Ok(commitment))) } } diff --git a/crates/astria-auctioneer/src/block/executed_stream.rs b/crates/astria-auctioneer/src/block/executed_stream.rs index 725e8e5f5..6edc0c957 100644 --- a/crates/astria-auctioneer/src/block/executed_stream.rs +++ b/crates/astria-auctioneer/src/block/executed_stream.rs @@ -16,9 +16,13 @@ use futures::{ Stream, StreamExt, }; +use telemetry::display::base64; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tracing::info; +use tracing::{ + debug, + error, +}; use super::{ Executed, @@ -80,6 +84,12 @@ impl Stream for ExecutedBlockStream { let executed_block = Executed::try_from_raw(raw).wrap_err("failed to parse raw to Executed")?; + debug!( + executed_block.rollup_block_hash = %base64(executed_block.rollup_block_hash()), + executed_block.sequencer_block_hash = %base64(executed_block.sequencer_block_hash()), + "received block execution result" + ); + std::task::Poll::Ready(Some(Ok(executed_block))) } } @@ -100,13 +110,13 @@ pub(crate) fn make_execution_requests_stream( .try_into_base_block(rollup_id) .wrap_err("failed to create BaseBlock from FilteredSequencerBlock"); - // skip blocks which fail to produce a BaseBlock for the given rollup_id + // skip blocks which fail to decode the transactions? match base_block { Ok(base_block) => Some(ExecuteOptimisticBlockStreamRequest { base_block: Some(base_block), }), Err(e) => { - info!(error = ?e, "skipping execution of invalid block"); + error!(error = ?e, "skipping execution of invalid block"); None } diff --git a/crates/astria-auctioneer/src/block/mod.rs b/crates/astria-auctioneer/src/block/mod.rs index 8b9b50cd8..7deaa9bf7 100644 --- a/crates/astria-auctioneer/src/block/mod.rs +++ b/crates/astria-auctioneer/src/block/mod.rs @@ -18,12 +18,11 @@ use astria_eyre::eyre::{ self, eyre, Context, - OptionExt, }; use bytes::Bytes; use prost::Message as _; -pub(crate) mod block_commitment_stream; +pub(crate) mod commitment_stream; pub(crate) mod executed_stream; pub(crate) mod optimistic_stream; @@ -55,10 +54,9 @@ impl Optimistic { }) } - pub(crate) fn into_raw(self) -> raw_sequencer_block::FilteredSequencerBlock { - self.filtered_sequencer_block.into_raw() - } - + /// Converts this [`Optimistic`] into a [`BaseBlock`] for the given `rollup_id`. + /// If there are no transactions for the given `rollup_id`, this will return a `BaseBlock`. + // TODO: add typed errors here? pub(crate) fn try_into_base_block( self, rollup_id: RollupId, @@ -70,19 +68,19 @@ impl Optimistic { .. } = self.filtered_sequencer_block.into_parts(); - let serialized_transactions = rollup_transactions + let maybe_serialized_transactions = rollup_transactions .swap_remove(&rollup_id) - .ok_or_eyre( - "FilteredSequencerBlock does not contain transactions for the given rollup", - )? - .into_parts(); - - let transactions = serialized_transactions - .transactions - .into_iter() - .map(raw_sequencer_block::RollupData::decode) - .collect::>() - .wrap_err("failed to decode RollupData")?; + .map(|transactions| transactions.into_parts()); + + let transactions = + maybe_serialized_transactions.map_or(Ok(vec![]), |serialized_transactions| { + serialized_transactions + .transactions + .into_iter() + .map(raw_sequencer_block::RollupData::decode) + .collect::>() + .wrap_err("failed to decode RollupData") + })?; let timestamp = Some(convert_tendermint_time_to_protobuf_timestamp(header.time())); @@ -135,6 +133,14 @@ impl Executed { } pub(crate) fn parent_rollup_block_hash(&self) -> [u8; 32] { + self.block + .parent_block_hash() + .as_ref() + .try_into() + .expect("rollup block hash must be 32 bytes") + } + + pub(crate) fn rollup_block_hash(&self) -> [u8; 32] { self.block .hash() .as_ref() @@ -218,4 +224,10 @@ impl Current { pub(crate) fn sequencer_block_hash(&self) -> [u8; 32] { self.optimistic.sequencer_block_hash() } + + pub(crate) fn rollup_parent_block_hash(&self) -> Option<[u8; 32]> { + self.executed + .as_ref() + .map(|executed| executed.parent_rollup_block_hash()) + } } diff --git a/crates/astria-auctioneer/src/block/optimistic_stream.rs b/crates/astria-auctioneer/src/block/optimistic_stream.rs index e435d71ab..cd052f4d4 100644 --- a/crates/astria-auctioneer/src/block/optimistic_stream.rs +++ b/crates/astria-auctioneer/src/block/optimistic_stream.rs @@ -13,11 +13,14 @@ use futures::{ Stream, StreamExt as _, }; +use telemetry::display::base64; +use tracing::debug; use super::Optimistic; use crate::optimistic_block_client::OptimisticBlockClient; /// A stream for receiving optimistic blocks from the sequencer. +// TODO: pin project these instead pub(crate) struct OptimisticBlockStream { client: Pin>>, } @@ -33,7 +36,6 @@ impl OptimisticBlockStream { .wrap_err("failed to stream optimistic blocks")?; Ok(OptimisticBlockStream { - // client, client: Box::pin(optimistic_stream_client), }) } @@ -46,18 +48,25 @@ impl Stream for OptimisticBlockStream { mut self: Pin<&mut Self>, cx: &mut std::task::Context, ) -> std::task::Poll> { - let raw = futures::ready!(self.client.poll_next_unpin(cx)) - // TODO: better error messages here - .ok_or_eyre("stream has been closed")? - .wrap_err("received gRPC error")? - .block - .ok_or_eyre( - "optimsitic block stream response did not contain filtered sequencer block", - )?; + // TODO: return none when stream is closed + let rsp = match futures::ready!(self.client.poll_next_unpin(cx)) { + Some(raw) => raw, + None => return std::task::Poll::Ready(None), + }; + + // TODO: filter_map on these errors + let raw = rsp.wrap_err("received gRPC error")?.block.ok_or_eyre( + "optimsitic block stream response did not contain filtered sequencer block", + )?; let optimistic_block = Optimistic::try_from_raw(raw).wrap_err("failed to parse raw to Optimistic")?; + debug!( + optimistic_block.sequencer_block_hash = %base64(optimistic_block.sequencer_block_hash()), + "received optimistic block from sequencer" + ); + std::task::Poll::Ready(Some(Ok(optimistic_block))) } } diff --git a/crates/astria-auctioneer/src/bundle/client.rs b/crates/astria-auctioneer/src/bundle/client.rs index 2adfceb26..50de5bfdf 100644 --- a/crates/astria-auctioneer/src/bundle/client.rs +++ b/crates/astria-auctioneer/src/bundle/client.rs @@ -20,6 +20,7 @@ use futures::{ }; use tonic::transport::Endpoint; use tracing::{ + instrument, warn, Instrument, Span, @@ -47,6 +48,7 @@ impl BundleClient { }) } + #[instrument(skip_all, fields(uri = %self.uri))] pub(crate) async fn get_bundle_stream( &mut self, ) -> eyre::Result> { diff --git a/crates/astria-auctioneer/src/bundle/mod.rs b/crates/astria-auctioneer/src/bundle/mod.rs index 575850438..a5f79b359 100644 --- a/crates/astria-auctioneer/src/bundle/mod.rs +++ b/crates/astria-auctioneer/src/bundle/mod.rs @@ -29,6 +29,7 @@ pub(crate) struct Bundle { /// The byte list of transactions fto be included. transactions: Vec, /// The hash of the rollup block that this bundle is based on. + // TODO: rename this to `parent_rollup_block_hash` to match execution api prev_rollup_block_hash: [u8; 32], /// The hash of the sequencer block used to derive the rollup block that this bundle is based /// on. @@ -71,9 +72,13 @@ impl Bundle { nonce: u32, rollup_id: RollupId, fee_asset: asset::Denom, + chain_id: String, ) -> TransactionBody { let data = self.into_raw().encode_to_vec(); + // TODO: sign the bundle data and put it in a `SignedBundle` message or something (need to + // update protos for this) + TransactionBody::builder() .actions(vec![ RollupDataSubmission { @@ -84,6 +89,7 @@ impl Bundle { .into(), ]) .nonce(nonce) + .chain_id(chain_id) .try_build() .expect("failed to build transaction body") } @@ -91,4 +97,12 @@ impl Bundle { pub(crate) fn bid(&self) -> u64 { self.fee } + + pub(crate) fn prev_rollup_block_hash(&self) -> [u8; 32] { + self.prev_rollup_block_hash + } + + pub(crate) fn base_sequencer_block_hash(&self) -> [u8; 32] { + self.base_sequencer_block_hash + } } diff --git a/crates/astria-auctioneer/src/optimistic_executor/mod.rs b/crates/astria-auctioneer/src/optimistic_executor/mod.rs index 5fde8be40..04e9221e5 100644 --- a/crates/astria-auctioneer/src/optimistic_executor/mod.rs +++ b/crates/astria-auctioneer/src/optimistic_executor/mod.rs @@ -1,17 +1,12 @@ -mod builder; - -use std::time::Duration; +//! - [ ] mention backpressure here -use astria_core::primitive::v1::{ - asset, - RollupId, -}; +use astria_core::primitive::v1::RollupId; use astria_eyre::eyre::{ self, + eyre, OptionExt, WrapErr as _, }; -pub(crate) use builder::Builder; use telemetry::display::base64; use tokio::select; use tokio_stream::StreamExt as _; @@ -26,7 +21,7 @@ use crate::{ auction, block::{ self, - block_commitment_stream::BlockCommitmentStream, + commitment_stream::BlockCommitmentStream, executed_stream::ExecutedBlockStream, optimistic_stream::OptimisticBlockStream, }, @@ -37,6 +32,18 @@ use crate::{ optimistic_block_client::OptimisticBlockClient, }; +mod builder; +pub(crate) use builder::Builder; + +macro_rules! break_for_closed_stream { + ($stream_res:expr, $msg:expr) => { + match $stream_res { + Some(val) => val, + None => break Err(eyre!($msg)), + } + }; +} + pub(crate) struct Startup { #[allow(dead_code)] metrics: &'static crate::Metrics, @@ -60,7 +67,6 @@ impl Startup { let sequencer_client = OptimisticBlockClient::new(&sequencer_grpc_endpoint) .wrap_err("failed to initialize sequencer grpc client")?; - // TODO: have a connect streams helper? let mut optimistic_blocks = OptimisticBlockStream::connect(rollup_id, sequencer_client.clone()) .await @@ -78,8 +84,6 @@ impl Startup { let bundle_stream = BundleStream::connect(rollup_grpc_endpoint) .await .wrap_err("failed to initialize bundle stream")?; - // let bundle_stream = BundleServiceClient::new(bundle_service_grpc_url) - // .wrap_err("failed to initialize bundle service grpc client")?; let optimistic_block = optimistic_blocks .next() @@ -117,6 +121,7 @@ pub(crate) struct Running { impl Running { pub(crate) async fn run(mut self) -> eyre::Result<()> { let reason: eyre::Result<&str> = { + // This is a long running loop. Errors are emitted inside the handlers. loop { select! { biased; @@ -125,33 +130,33 @@ impl Running { }, Some((id, res)) = self.auctions.join_next() => { - // TODO: why doesnt this use `id` + // TODO: this seems wrong? res.wrap_err_with(|| "auction failed for block {id}").map(|_| "auction {id} failed")?; }, - Some(res) = self.optimistic_blocks.next() => { - let optimistic_block = res.wrap_err("failed to get optimistic block")?; + res = self.optimistic_blocks.next() => { + let res = break_for_closed_stream!(res, "optimistic block stream closed"); - self.optimistic_block_handler(optimistic_block).wrap_err("failed to handle optimistic block")?; + let _ = self.handle_optimistic_block(res); }, Some(res) = self.block_commitments.next() => { - let block_commitment = res.wrap_err("failed to get block commitment")?; + let block_commitment = tri!(res.wrap_err("failed to get block commitment")); - self.block_commitment_handler(block_commitment).wrap_err("failed to handle block commitment")?; + let _ = self.handle_block_commitment(block_commitment); }, Some(res) = self.executed_blocks.next() => { let executed_block = res.wrap_err("failed to get executed block")?; - self.executed_block_handler(executed_block).wrap_err("failed to handle executed block")?; + let _ = self.handle_executed_block(executed_block); } Some(res) = self.bundle_stream.next() => { let bundle = res.wrap_err("failed to get bundle")?; - self.bundle_handler(bundle).wrap_err("failed to handle bundle")?; + let _ = self.handle_incoming_bundle(bundle); } } } @@ -162,15 +167,16 @@ impl Running { Err(err) => error!(%err, "shutting down due to error"), }; - self.shutdown().await; Ok(()) } - #[instrument(skip(self), fields(auction.old_id = %base64(self.current_block.sequencer_block_hash())))] - fn optimistic_block_handler( + #[instrument(skip(self), fields(auction.old_id = %base64(self.current_block.sequencer_block_hash())), err)] + fn handle_optimistic_block( &mut self, - optimistic_block: block::Optimistic, + optimistic_block: eyre::Result, ) -> eyre::Result<()> { + let optimistic_block = optimistic_block.wrap_err("failed receiving optimistic block")?; + let old_auction_id = auction::Id::from_sequencer_block_hash(self.current_block.sequencer_block_hash()); self.auctions @@ -188,9 +194,8 @@ impl Running { auction::Id::from_sequencer_block_hash(self.current_block.sequencer_block_hash()); self.auctions.new_auction(new_auction_id); - // create and run the auction fut and save the its handles // forward the optimistic block to the rollup's optimistic execution server - // TODO: don't want to exit on this, just complain with a log and skip the block or smth + // TODO: don't want to exit on this, just complain with a log and skip the block or smth? self.blocks_to_execute_handle .try_send_block_to_execute(optimistic_block) .wrap_err("failed to send optimistic block for execution")?; @@ -198,18 +203,16 @@ impl Running { Ok(()) } - #[instrument(skip(self), fields(auction.id = %base64(self.current_block.sequencer_block_hash())))] - fn block_commitment_handler( - &mut self, - block_commitment: block::Commitment, - ) -> eyre::Result<()> { - // TODO: handle this with a log instead of exiting + #[instrument(skip_all, fields(auction.id = %base64(self.current_block.sequencer_block_hash())), err)] + fn handle_block_commitment(&mut self, block_commitment: block::Commitment) -> eyre::Result<()> { + // TODO: handle this with a log instead of exiting? self.current_block .commitment(block_commitment) .wrap_err("failed to set block commitment")?; let auction_id = auction::Id::from_sequencer_block_hash(self.current_block.sequencer_block_hash()); + self.auctions .start_timer(auction_id) .wrap_err("failed to start timer")?; @@ -217,15 +220,16 @@ impl Running { Ok(()) } - #[instrument(skip(self), fields(auction.id = %base64(self.current_block.sequencer_block_hash())))] - fn executed_block_handler(&mut self, executed_block: block::Executed) -> eyre::Result<()> { - // TODO: handle this with a log instead of exiting + #[instrument(skip_all, fields(auction.id = %base64(self.current_block.sequencer_block_hash())))] + fn handle_executed_block(&mut self, executed_block: block::Executed) -> eyre::Result<()> { + // TODO: handle this with a log instead of exiting? self.current_block .execute(executed_block) .wrap_err("failed to set block to executed")?; let auction_id = auction::Id::from_sequencer_block_hash(self.current_block.sequencer_block_hash()); + self.auctions .start_processing_bids(auction_id) .wrap_err("failed to start processing bids")?; @@ -233,8 +237,29 @@ impl Running { Ok(()) } - #[instrument(skip(self), fields(auction.id = %base64(self.current_block.sequencer_block_hash())))] - fn bundle_handler(&mut self, bundle: Bundle) -> eyre::Result<()> { + #[instrument(skip_all, fields(auction.id = %base64(self.current_block.sequencer_block_hash())))] + fn handle_incoming_bundle(&mut self, bundle: Bundle) -> eyre::Result<()> { + // TODO: use ensure! here and provide the hashes in the error + if bundle.base_sequencer_block_hash() != self.current_block.sequencer_block_hash() { + return Err(eyre!( + "incoming bundle's {sequencer_block_hash} does not match current sequencer block \ + hash" + )); + } + + if let Some(rollup_parent_block_hash) = self.current_block.rollup_parent_block_hash() { + if bundle.prev_rollup_block_hash() != rollup_parent_block_hash { + return Err(eyre!( + "bundle's rollup parent block hash does not match current rollup parent block \ + hash" + )); + } + } else { + // TODO: should i buffer these up in the channel until the block is executed and then + // filter them in the auction if the parent hashes dont match? + return Err(eyre!("current block has not been executed yet.")); + } + let auction_id = auction::Id::from_sequencer_block_hash(self.current_block.sequencer_block_hash()); self.auctions @@ -243,8 +268,4 @@ impl Running { Ok(()) } - - async fn shutdown(self) { - self.shutdown_token.cancel(); - } }