diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 175297b9415b..1066065adc10 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -11,7 +11,7 @@ use zksync_da_client::{ }; use super::{blob_info::BlobInfo, sdk::RawEigenClient}; -use crate::utils::to_non_retriable_da_error; +use crate::utils::to_retriable_da_error; /// EigenClient is a client for the Eigen DA service. /// It can be configured to use one of two dispersal methods: @@ -50,7 +50,7 @@ impl DataAvailabilityClient for EigenClient { .client .dispatch_blob(data) .await - .map_err(to_non_retriable_da_error)?; + .map_err(to_retriable_da_error)?; Ok(DispatchResponse::from(blob_id)) } @@ -59,7 +59,7 @@ impl DataAvailabilityClient for EigenClient { let blob_info = self .get_commitment(blob_id) .await - .map_err(to_non_retriable_da_error)?; + .map_err(to_retriable_da_error)?; let rlp_encoded_bytes = hex::decode(blob_info).map_err(|_| DAError { error: anyhow!("Failed to decode blob_id"), is_retriable: false, diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 0704b26c9a23..208a2f138c7e 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -1,9 +1,12 @@ -use std::{str::FromStr, time::Duration}; +use std::{str::FromStr, sync::Arc, time::Duration}; use backon::{ConstantBuilder, Retryable}; use secp256k1::{ecdsa::RecoverableSignature, SecretKey}; -use tokio::{sync::mpsc, time::Instant}; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio::{ + sync::{mpsc, Mutex}, + time::Instant, +}; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; use tonic::{ transport::{Channel, ClientTlsConfig, Endpoint}, Streaming, @@ -28,7 +31,7 @@ use crate::eigen::{ #[derive(Debug, Clone)] pub(crate) struct RawEigenClient { - client: DisperserClient, + client: Arc>>, private_key: SecretKey, pub config: EigenConfig, verifier: Verifier, @@ -38,15 +41,12 @@ pub(crate) const DATA_CHUNK_SIZE: usize = 32; pub(crate) const AVG_BLOCK_TIME: u64 = 12; impl RawEigenClient { - pub(crate) const BUFFER_SIZE: usize = 1000; const BLOB_SIZE_LIMIT: usize = 1024 * 1024 * 2; // 2 MB pub async fn new(private_key: SecretKey, config: EigenConfig) -> anyhow::Result { let endpoint = Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?; - let client = DisperserClient::connect(endpoint) - .await - .map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?; + let client = Arc::new(Mutex::new(DisperserClient::connect(endpoint).await?)); let verifier_config = VerifierConfig { rpc_url: config.eigenda_eth_rpc.clone(), @@ -80,8 +80,13 @@ impl RawEigenClient { account_id: String::default(), // Account Id is not used in non-authenticated mode }; - let mut client_clone = self.client.clone(); - let disperse_reply = client_clone.disperse_blob(request).await?.into_inner(); + let disperse_reply = self + .client + .lock() + .await + .disperse_blob(request) + .await? + .into_inner(); Ok(hex::encode(disperse_reply.request_id)) } @@ -106,24 +111,27 @@ impl RawEigenClient { } async fn dispatch_blob_authenticated(&self, data: Vec) -> anyhow::Result { - let mut client_clone = self.client.clone(); - let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE); - - let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx)); - let padded_data = convert_by_padding_empty_byte(&data); + let (tx, rx) = mpsc::unbounded_channel(); // 1. send DisperseBlobRequest - self.disperse_data(padded_data, &tx).await?; + let padded_data = convert_by_padding_empty_byte(&data); + self.disperse_data(padded_data, &tx)?; // this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest` - let mut response_stream = response_stream.await?.into_inner(); + let mut response_stream = self + .client + .clone() + .lock() + .await + .disperse_blob_authenticated(UnboundedReceiverStream::new(rx)) + .await?; + let response_stream = response_stream.get_mut(); // 2. receive BlobAuthHeader - let blob_auth_header = self.receive_blob_auth_header(&mut response_stream).await?; + let blob_auth_header = self.receive_blob_auth_header(response_stream).await?; // 3. sign and send BlobAuthHeader - self.submit_authentication_data(blob_auth_header.clone(), &tx) - .await?; + self.submit_authentication_data(blob_auth_header.clone(), &tx)?; // 4. receive DisperseBlobReply let reply = response_stream @@ -141,11 +149,8 @@ impl RawEigenClient { } pub async fn get_inclusion_data(&self, blob_id: &str) -> anyhow::Result { - let client_clone = self.client.clone(); let disperse_time = Instant::now(); - let blob_info = self - .await_for_inclusion(client_clone, blob_id.to_string()) - .await?; + let blob_info = self.await_for_inclusion(blob_id.to_string()).await?; let blob_info = blob_info::BlobInfo::try_from(blob_info) .map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?; @@ -181,10 +186,10 @@ impl RawEigenClient { } } - async fn disperse_data( + fn disperse_data( &self, data: Vec, - tx: &mpsc::Sender, + tx: &mpsc::UnboundedSender, ) -> anyhow::Result<()> { let req = disperser::AuthenticatedRequest { payload: Some(DisperseRequest(disperser::DisperseBlobRequest { @@ -195,14 +200,13 @@ impl RawEigenClient { }; tx.send(req) - .await .map_err(|e| anyhow::anyhow!("Failed to send DisperseBlobRequest: {}", e)) } - async fn submit_authentication_data( + fn submit_authentication_data( &self, blob_auth_header: BlobAuthHeader, - tx: &mpsc::Sender, + tx: &mpsc::UnboundedSender, ) -> anyhow::Result<()> { // TODO: replace challenge_parameter with actual auth header when it is available let digest = zksync_basic_types::web3::keccak256( @@ -226,7 +230,6 @@ impl RawEigenClient { }; tx.send(req) - .await .map_err(|e| anyhow::anyhow!("Failed to send AuthenticationData: {}", e)) } @@ -254,18 +257,16 @@ impl RawEigenClient { } } - async fn await_for_inclusion( - &self, - client: DisperserClient, - request_id: String, - ) -> anyhow::Result { + async fn await_for_inclusion(&self, request_id: String) -> anyhow::Result { let polling_request = disperser::BlobStatusRequest { request_id: hex::decode(request_id)?, }; let blob_info = (|| async { - let mut client_clone = client.clone(); - let resp = client_clone + let resp = self + .client + .lock() + .await .get_blob_status(polling_request.clone()) .await? .into_inner(); @@ -331,7 +332,8 @@ impl RawEigenClient { .batch_header_hash; let get_response = self .client - .clone() + .lock() + .await .retrieve_blob(disperser::RetrieveBlobRequest { batch_header_hash, blob_index,