Skip to content

Commit

Permalink
chore(bridge-withdrawer): pass GRPC and CometBFT clients to consumers…
Browse files Browse the repository at this point in the history
… directly (#1510)

## Summary
Changed `sequencer_cometbft_client` and `sequencer_grpc_client`
construction so that each can be passed to consumers via cloning.

## Background
Both clients were previously constructed ad-hoc from endpoints, which
was unnecessary.

## Changes
- Moved construction of `sequencer_cometbft_client` and
`sequencer_grpc_client` up so that each can be passed to its consumers
(`Startup` and `Submitter`) via cloning.

## Testing
Passing all tests

## Related Issues
closes #1315
  • Loading branch information
ethanoroshiba authored Sep 18, 2024
1 parent 1be156e commit f7ef132
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 44 deletions.
20 changes: 14 additions & 6 deletions crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
time::Duration,
};

use astria_core::generated::sequencerblock::v1alpha1::sequencer_service_client;
use astria_eyre::eyre::{
self,
WrapErr as _,
Expand Down Expand Up @@ -92,28 +93,35 @@ impl BridgeWithdrawer {
.parse()
.wrap_err("failed to parse sequencer bridge address")?;

let sequencer_grpc_connection =
tonic::transport::Endpoint::new(sequencer_grpc_endpoint)?.connect_lazy();
let sequencer_grpc_client =
sequencer_service_client::SequencerServiceClient::new(sequencer_grpc_connection);
let sequencer_cometbft_client =
sequencer_client::HttpClient::new(&*sequencer_cometbft_endpoint)
.wrap_err("failed constructing cometbft http client")?;

// make startup object
let startup = startup::Builder {
shutdown_token: shutdown_handle.token(),
state: state.clone(),
sequencer_chain_id,
sequencer_cometbft_endpoint: sequencer_cometbft_endpoint.clone(),
sequencer_cometbft_client: sequencer_cometbft_client.clone(),
sequencer_bridge_address,
sequencer_grpc_endpoint: sequencer_grpc_endpoint.clone(),
sequencer_grpc_client: sequencer_grpc_client.clone(),
expected_fee_asset: fee_asset_denomination,
metrics,
}
.build()
.wrap_err("failed to initialize startup")?;
.build();

let startup_handle = startup::InfoHandle::new(state.subscribe());

// make submitter object
let (submitter, submitter_handle) = submitter::Builder {
shutdown_token: shutdown_handle.token(),
startup_handle: startup_handle.clone(),
sequencer_cometbft_endpoint,
sequencer_grpc_endpoint,
sequencer_cometbft_client,
sequencer_grpc_client,
sequencer_key_path,
sequencer_address_prefix: sequencer_address_prefix.clone(),
state: state.clone(),
Expand Down
33 changes: 12 additions & 21 deletions crates/astria-bridge-withdrawer/src/bridge_withdrawer/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,40 +65,36 @@ pub(super) struct Builder {
pub(super) shutdown_token: CancellationToken,
pub(super) state: Arc<State>,
pub(super) sequencer_chain_id: String,
pub(super) sequencer_cometbft_endpoint: String,
pub(super) sequencer_grpc_endpoint: String,
pub(super) sequencer_cometbft_client: sequencer_client::HttpClient,
pub(super) sequencer_grpc_client: SequencerServiceClient<Channel>,
pub(super) sequencer_bridge_address: Address,
pub(super) expected_fee_asset: asset::Denom,
pub(super) metrics: &'static Metrics,
}

impl Builder {
pub(super) fn build(self) -> eyre::Result<Startup> {
pub(super) fn build(self) -> Startup {
let Self {
shutdown_token,
state,
sequencer_chain_id,
sequencer_cometbft_endpoint,
sequencer_cometbft_client,
sequencer_bridge_address,
sequencer_grpc_endpoint,
sequencer_grpc_client,
expected_fee_asset,
metrics,
} = self;

let sequencer_cometbft_client =
sequencer_client::HttpClient::new(&*sequencer_cometbft_endpoint)
.wrap_err("failed constructing cometbft http client")?;

Ok(Startup {
Startup {
shutdown_token,
state,
sequencer_chain_id,
sequencer_cometbft_client,
sequencer_grpc_endpoint,
sequencer_grpc_client,
sequencer_bridge_address,
expected_fee_asset,
metrics,
})
}
}
}

Expand Down Expand Up @@ -141,7 +137,7 @@ pub(super) struct Startup {
state: Arc<State>,
sequencer_chain_id: String,
sequencer_cometbft_client: sequencer_client::HttpClient,
sequencer_grpc_endpoint: String,
sequencer_grpc_client: SequencerServiceClient<Channel>,
sequencer_bridge_address: Address,
expected_fee_asset: asset::Denom,
metrics: &'static Metrics,
Expand All @@ -159,7 +155,7 @@ impl Startup {

wait_for_empty_mempool(
self.sequencer_cometbft_client.clone(),
self.sequencer_grpc_endpoint.clone(),
self.sequencer_grpc_client.clone(),
self.sequencer_bridge_address,
self.state.clone(),
self.metrics,
Expand Down Expand Up @@ -400,7 +396,7 @@ async fn ensure_mempool_empty(
#[instrument(skip_all, err)]
async fn wait_for_empty_mempool(
cometbft_client: sequencer_client::HttpClient,
sequencer_grpc_endpoint: String,
sequencer_grpc_client: SequencerServiceClient<Channel>,
address: Address,
state: Arc<State>,
metrics: &'static Metrics,
Expand All @@ -424,14 +420,9 @@ async fn wait_for_empty_mempool(
futures::future::ready(())
},
);
let sequencer_client = SequencerServiceClient::connect(sequencer_grpc_endpoint.clone())
.await
.wrap_err_with(|| {
format!("failed to connect to sequencer at `{sequencer_grpc_endpoint}`")
})?;

tryhard::retry_fn(|| {
let sequencer_client = sequencer_client.clone();
let sequencer_client = sequencer_grpc_client.clone();
let cometbft_client = cometbft_client.clone();
let state = state.clone();
ensure_mempool_empty(cometbft_client, sequencer_client, address, state, metrics)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use astria_core::generated::sequencerblock::v1alpha1::sequencer_service_client::SequencerServiceClient;
use astria_eyre::eyre::{
self,
Context as _,
Expand Down Expand Up @@ -47,8 +48,8 @@ pub(crate) struct Builder {
pub(crate) startup_handle: startup::InfoHandle,
pub(crate) sequencer_key_path: String,
pub(crate) sequencer_address_prefix: String,
pub(crate) sequencer_cometbft_endpoint: String,
pub(crate) sequencer_grpc_endpoint: String,
pub(crate) sequencer_cometbft_client: sequencer_client::HttpClient,
pub(crate) sequencer_grpc_client: SequencerServiceClient<tonic::transport::Channel>,
pub(crate) state: Arc<State>,
pub(crate) metrics: &'static Metrics,
}
Expand All @@ -61,8 +62,8 @@ impl Builder {
startup_handle,
sequencer_key_path,
sequencer_address_prefix,
sequencer_cometbft_endpoint,
sequencer_grpc_endpoint,
sequencer_cometbft_client,
sequencer_grpc_client,
state,
metrics,
} = self;
Expand All @@ -74,10 +75,6 @@ impl Builder {
.wrap_err("failed to load sequencer private key")?;
info!(address = %signer.address(), "loaded sequencer signer");

let sequencer_cometbft_client =
sequencer_client::HttpClient::new(&*sequencer_cometbft_endpoint)
.wrap_err("failed constructing cometbft http client")?;

let (batches_tx, batches_rx) = tokio::sync::mpsc::channel(BATCH_QUEUE_SIZE);
let handle = Handle::new(batches_tx);

Expand All @@ -88,7 +85,7 @@ impl Builder {
state,
batches_rx,
sequencer_cometbft_client,
sequencer_grpc_endpoint,
sequencer_grpc_client,
signer,
metrics,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ pub(super) struct Submitter {
state: Arc<State>,
batches_rx: mpsc::Receiver<Batch>,
sequencer_cometbft_client: sequencer_client::HttpClient,
sequencer_grpc_endpoint: String,
sequencer_grpc_client: SequencerServiceClient<Channel>,
signer: SequencerKey,
metrics: &'static Metrics,
}

impl Submitter {
pub(super) async fn run(mut self) -> eyre::Result<()> {
let (sequencer_chain_id, sequencer_grpc_client) = select! {
let sequencer_chain_id = select! {
() = self.shutdown_token.cancelled() => {
report_exit(Ok("submitter received shutdown signal while waiting for startup"));
return Ok(());
Expand All @@ -85,12 +85,8 @@ impl Submitter {
startup_info = self.startup_handle.get_info() => {
let startup::Info { chain_id, .. } = startup_info.wrap_err("submitter failed to get startup info")?;

let sequencer_grpc_client = sequencer_service_client::SequencerServiceClient::connect(
self.sequencer_grpc_endpoint.clone(),
).await.wrap_err("failed to connect to sequencer gRPC endpoint")?;

self.state.set_submitter_ready();
(chain_id, sequencer_grpc_client)
chain_id
}
};
self.state.set_submitter_ready();
Expand All @@ -110,7 +106,7 @@ impl Submitter {

// if batch submission fails, halt the submitter
if let Err(e) = self.process_batch(
sequencer_grpc_client.clone(),
self.sequencer_grpc_client.clone(),
&sequencer_chain_id,
actions,
rollup_height,
Expand Down

0 comments on commit f7ef132

Please sign in to comment.