Skip to content

Commit

Permalink
add stream
Browse files Browse the repository at this point in the history
  • Loading branch information
itamarreif committed Oct 2, 2024
1 parent 4d944f1 commit e0409f0
Show file tree
Hide file tree
Showing 8 changed files with 508 additions and 325 deletions.
562 changes: 280 additions & 282 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions crates/astria-auctioneer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ telemetry = { package = "astria-telemetry", path = "../astria-telemetry", featur
async-trait = { workspace = true }
axum = { workspace = true }
futures = { workspace = true }
humantime = { workspace = true }
itertools = { workspace = true }
once_cell = { workspace = true }
pin-project-lite = { workspace = true }
Expand Down
22 changes: 12 additions & 10 deletions crates/astria-auctioneer/src/auction_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ impl AuctionDriver {

// TODO: should this be conditioned on the block state not being committed? or the
// auction state not be "closing"? instead of advancing the block state here
// i should have a handle that reads current state from an arc? curr_block =
// curr_block.apply_state(), if auction.committed() => { match curr_block.
// state() { optimistic
// drop old auction if it exists
// make new auction
// executed -> open the auction for bids
// committed -> start the timer for closing
// committed and executed -> ?
// },
// }
// i should have a handle that reads current state from an arc?
// curr_block =
// curr_block.apply_state(), if !auction.committed() => {
// match curr_block.state() {
// optimistic ->
// drop old auction if it exists
// make new auction
// executed -> open the auction for bids
// committed -> start the timer for closing
// committed and executed -> ?
// },
// }
// },

// new bundle from the bundle stream -> if auction is open, add the bid to the auction.
Expand Down
57 changes: 53 additions & 4 deletions crates/astria-auctioneer/src/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,21 @@ use tokio::sync::watch;
// TODO: these should be created from the protos
#[derive(Debug, Clone)]
pub(crate) struct Optimistic {
sequencer_block: StreamOptimisticBlockResponse,
// TODO: actually convert this instead of just wrapping
raw: StreamOptimisticBlockResponse,
}

impl Optimistic {
pub(crate) fn from_raw(raw: StreamOptimisticBlockResponse) -> Self {
Self {
raw,
}
}

pub(crate) fn into_raw(self) -> StreamOptimisticBlockResponse {
self.raw
}

fn into_executed_block(self, _executed_block: Executed) -> Executed {
todo!()
}
Expand All @@ -29,11 +40,21 @@ impl Optimistic {

#[derive(Debug, Clone)]
pub(crate) struct Executed {
executed_block: execution::v1alpha2::Block,
raw: execution::v1alpha2::Block,
}

impl Executed {
fn into_exec_and_commit(self) -> Committed {
pub(crate) fn from_raw(raw: execution::v1alpha2::Block) -> Self {
Self {
raw,
}
}

pub(crate) fn into_raw(self) -> execution::v1alpha2::Block {
self.raw
}

fn into_exec_and_commit(self, _commit: Committed) -> ExecutedAndCommitted {
todo!()
}

Expand All @@ -44,10 +65,20 @@ impl Executed {

#[derive(Debug, Clone)]
pub(crate) struct Committed {
commit: SequencerBlockCommit,
raw: SequencerBlockCommit,
}

impl Committed {
pub(crate) fn from_raw(raw: SequencerBlockCommit) -> Self {
Self {
raw,
}
}

pub(crate) fn into_raw(self) -> SequencerBlockCommit {
self.raw
}

fn into_exec_and_commit(self, _executed_block: Executed) -> ExecutedAndCommitted {
todo!()
}
Expand Down Expand Up @@ -85,6 +116,24 @@ impl State {
}
}

// TODO: instead of state, should `CurrentBlock` just be:
// pub(crate) struct CurrentBlock {
// optimistic: Option<Optimistic>,
// executed: Option<Executed>,
// committed: Option<Committed>,
// }

// impl CurrentBlock {
// pub(crate) fn is_optimistic(self) -> bool {
// self.optimistic.is_some()
// }

// pub(crate) fn apply_optimistic(self, new_block: Optimistic) -> Self {
// unimplemented!()
// }
// // etc
// }

#[derive(Debug, Clone)]
pub(crate) struct CurrentBlock {
inner: State,
Expand Down
32 changes: 28 additions & 4 deletions crates/astria-auctioneer/src/optimistic_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,77 @@ mod committed_block_stream;
mod executed_block_stream;
mod optimistic_block_stream;

use astria_core::primitive::v1::RollupId;
use astria_eyre::eyre;
pub(crate) use builder::Builder;
use optimistic_block_stream::OptimisticBlockStream;
use sequencer_client::SequencerGrpcClient;
use tokio::{
select,
sync::{
mpsc,
watch,
},
};
use tokio_stream::StreamExt as _;
use tracing::info;

use crate::block::{
self,
Committed,
CurrentBlock,
Executed,
Optimistic,
};

mod sequencer_client;
use astria_eyre::eyre::WrapErr as _;

pub(crate) struct Handle {
block_rx: watch::Receiver<CurrentBlock>,
}

pub(crate) struct OptimisticExecutor {
sequencer_grpc_url: String,
rollup_id: RollupId,
optimistic_blocks_rx: mpsc::Receiver<Optimistic>,
executed_blocks_rx: mpsc::Receiver<Executed>,
block_commitments_rx: mpsc::Receiver<Committed>,
// watch::Sender<CurrentBlock>,
block: CurrentBlock,
}

impl OptimisticExecutor {
pub(crate) async fn run(mut self) -> eyre::Result<()> {
let Self {
optimistic_blocks_rx: mut opt_rx,
sequencer_grpc_url,
rollup_id,
executed_blocks_rx: mut exec_rx,
block_commitments_rx: mut commit_rx,
mut block,
..
} = self;

// TODO: use grpc streams instead of channels
let mut sequencer_client = SequencerGrpcClient::new(&sequencer_grpc_url)
.wrap_err("failed to initialize sequencer grpc client")?;
let stream_client = sequencer_client
.optimistic_block_stream(rollup_id)
.await
.wrap_err("failed to stream optimistic blocks")?;
let mut optimistic_block_stream = OptimisticBlockStream::new(stream_client);

// TODO: probably want to interact with the block state machine via the handle
loop {
let old_block = block.clone();
let new_block = select! {
opt = opt_rx.recv() => {
block.apply_optimistic_block(opt.unwrap())
optimistic_block = optimistic_block_stream.next() => {
// TODO: stop doing this unwrap unwrap thing with the stream
// curr_block = block.borrow();
// let next = curr_block.apply_optimistic_block(optimistic_block.unwrap().unwrap())
// block.send(next);


// TODO: add execute_fut fused future that sends this to the optimistic execution stream
},
exec = exec_rx.recv() => {
block.apply_executed_block(exec.unwrap())
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,45 @@
mod builder;
pub(crate) use builder::Builder;
use std::pin::Pin;

use crate::Metrics;
use astria_core::generated::sequencerblock::v1alpha1::StreamOptimisticBlockResponse;
use astria_eyre::eyre;
use futures::{
Stream,
StreamExt as _,
};

use crate::block;

pub(crate) struct OptimisticBlockStream {
#[allow(dead_code)]
metrics: &'static Metrics,
// TODO: does this need to be pinned?
// client: tonic::Streaming<StreamOptimisticBlockResponse>,
client: Pin<Box<dyn Stream<Item = Result<StreamOptimisticBlockResponse, tonic::Status>>>>,
}

impl OptimisticBlockStream {
pub(crate) fn new(client: tonic::Streaming<StreamOptimisticBlockResponse>) -> Self {
Self {
// client,
client: Box::pin(client),
}
}
}

// this should have a stream impl that produces `block::Optimistic` structs

impl Stream for OptimisticBlockStream {
type Item = eyre::Result<block::Optimistic>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context,
) -> std::task::Poll<Option<Self::Item>> {
// is this the correct way to use use tonic::Streaming here
// TODO: don't unwrap unwrap the streams
let raw = futures::ready!(self.client.poll_next_unpin(cx))
.unwrap()
.unwrap();
// convert raw to block::Optimistic
let opt = block::Optimistic::from_raw(raw);
std::task::Poll::Ready(Some(Ok(opt)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::time::Duration;

use astria_core::{
generated::sequencerblock::v1alpha1::{
optimistic_block_service_client::OptimisticBlockServiceClient,
StreamOptimisticBlockRequest,
StreamOptimisticBlockResponse,
},
primitive::v1::RollupId,
};
use astria_eyre::eyre::{
self,
WrapErr as _,
};
use tonic::transport::{
Channel,
Endpoint,
Uri,
};
use tracing::{
instrument,
warn,
Instrument as _,
};

/// Wraps the gRPC client for the Sequencer service that wraps client calls with `tryhard`.
#[derive(Debug, Clone)]
pub(crate) struct SequencerGrpcClient {
inner: OptimisticBlockServiceClient<Channel>,
uri: Uri,
}

impl SequencerGrpcClient {
pub(crate) fn new(sequencer_uri: &str) -> eyre::Result<Self> {
let uri: Uri = sequencer_uri
.parse()
.wrap_err("failed parsing provided string as Uri")?;

// TODO: use a UDS socket instead
let endpoint = Endpoint::from(uri.clone());
let inner = OptimisticBlockServiceClient::new(endpoint.connect_lazy());
Ok(Self {
inner,
uri,
})
}

#[instrument(skip_all, fields(
uri = %self.uri,
%rollup_id,
err,
))]
pub(super) async fn optimistic_block_stream(
&mut self,
rollup_id: RollupId,
) -> eyre::Result<tonic::Streaming<StreamOptimisticBlockResponse>> {
let span = tracing::Span::current();
let retry_cfg = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(2))
.on_retry(
|attempt: u32, next_delay: Option<Duration>, error: &tonic::Status| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
parent: &span,
attempt,
wait_duration,
error = error as &dyn std::error::Error,
"attempt to initialize optimistic block stream failed; retrying after backoff",
);
futures::future::ready(())
},
);

let client = self.inner.clone();
let stream = tryhard::retry_fn(|| {
let mut client = client.clone();
let req = StreamOptimisticBlockRequest {
rollup_id: Some(rollup_id.into_raw()),
};
async move { client.stream_optimistic_block(req).await }
})
.with_config(retry_cfg)
.in_current_span()
.await
.wrap_err("failed to initialize optimistic block stream")?
.into_inner();

Ok(stream)
}
}

0 comments on commit e0409f0

Please sign in to comment.