diff --git a/core/lib/config/src/configs/da_client/eigen.rs b/core/lib/config/src/configs/da_client/eigen.rs index f94675871463..0084cfe05186 100644 --- a/core/lib/config/src/configs/da_client/eigen.rs +++ b/core/lib/config/src/configs/da_client/eigen.rs @@ -1,30 +1,8 @@ use serde::Deserialize; use zksync_basic_types::secrets::PrivateKey; - -pub const EIGEN_MEMSTORE_CLIENT_NAME: &str = "MemStore"; -pub const EIGEN_DISPERSER_CLIENT_NAME: &str = "Disperser"; - -#[derive(Clone, Debug, PartialEq, Deserialize)] -pub enum EigenConfig { - MemStore(MemStoreConfig), - Disperser(DisperserConfig), -} - -/// Configuration for the EigenDA in-memory client. -#[derive(Clone, Debug, PartialEq, Deserialize, Default)] -pub struct MemStoreConfig { - pub max_blob_size_bytes: u64, - /// Blob expiration time in seconds - pub blob_expiration: u64, - /// Latency in milliseconds for get operations - pub get_latency: u64, - /// Latency in milliseconds for put operations - pub put_latency: u64, -} - /// Configuration for the EigenDA remote disperser client. #[derive(Clone, Debug, PartialEq, Deserialize, Default)] -pub struct DisperserConfig { +pub struct EigenConfig { /// URL of the Disperser RPC server pub disperser_rpc: String, /// Block height needed to reach in order to consider the blob finalized diff --git a/core/lib/env_config/src/da_client.rs b/core/lib/env_config/src/da_client.rs index 7dc5ece5db59..7e711c5ff588 100644 --- a/core/lib/env_config/src/da_client.rs +++ b/core/lib/env_config/src/da_client.rs @@ -1,21 +1,17 @@ use std::env; -use zksync_config::{ - configs::{ - da_client::{ - avail::{ - AvailClientConfig, AvailSecrets, AVAIL_FULL_CLIENT_NAME, - AVAIL_GAS_RELAY_CLIENT_NAME, - }, - celestia::CelestiaSecrets, - eigen::{EigenSecrets, EIGEN_DISPERSER_CLIENT_NAME, EIGEN_MEMSTORE_CLIENT_NAME}, - DAClientConfig, AVAIL_CLIENT_CONFIG_NAME, CELESTIA_CLIENT_CONFIG_NAME, - EIGEN_CLIENT_CONFIG_NAME, OBJECT_STORE_CLIENT_CONFIG_NAME, +use zksync_config::configs::{ + da_client::{ + avail::{ + AvailClientConfig, AvailSecrets, AVAIL_FULL_CLIENT_NAME, AVAIL_GAS_RELAY_CLIENT_NAME, }, - secrets::DataAvailabilitySecrets, - AvailConfig, + celestia::CelestiaSecrets, + eigen::EigenSecrets, + DAClientConfig, AVAIL_CLIENT_CONFIG_NAME, CELESTIA_CLIENT_CONFIG_NAME, + EIGEN_CLIENT_CONFIG_NAME, OBJECT_STORE_CLIENT_CONFIG_NAME, }, - EigenConfig, + secrets::DataAvailabilitySecrets, + AvailConfig, }; use crate::{envy_load, FromEnv}; @@ -38,17 +34,7 @@ impl FromEnv for DAClientConfig { }, }), CELESTIA_CLIENT_CONFIG_NAME => Self::Celestia(envy_load("da_celestia_config", "DA_")?), - EIGEN_CLIENT_CONFIG_NAME => match env::var("DA_EIGEN_CLIENT_TYPE")?.as_str() { - EIGEN_DISPERSER_CLIENT_NAME => Self::Eigen(EigenConfig::Disperser(envy_load( - "da_eigen_config_disperser", - "DA_", - )?)), - EIGEN_MEMSTORE_CLIENT_NAME => Self::Eigen(EigenConfig::MemStore(envy_load( - "da_eigen_config_memstore", - "DA_", - )?)), - _ => anyhow::bail!("Unknown Eigen DA client type"), - }, + EIGEN_CLIENT_CONFIG_NAME => Self::Eigen(envy_load("da_eigen_config", "DA_")?), OBJECT_STORE_CLIENT_CONFIG_NAME => { Self::ObjectStore(envy_load("da_object_store", "DA_")?) } @@ -108,7 +94,6 @@ mod tests { configs::{ da_client::{ avail::{AvailClientConfig, AvailDefaultConfig}, - eigen::{DisperserConfig, MemStoreConfig}, DAClientConfig::{self, ObjectStore}, }, object_store::ObjectStoreMode::GCS, @@ -259,32 +244,7 @@ mod tests { } #[test] - fn from_env_eigen_client_memstore() { - let mut lock = MUTEX.lock(); - let config = r#" - DA_CLIENT="Eigen" - DA_EIGEN_CLIENT_TYPE="MemStore" - DA_MAX_BLOB_SIZE_BYTES=10 - DA_BLOB_EXPIRATION=20 - DA_GET_LATENCY=30 - DA_PUT_LATENCY=40 - "#; - lock.set_env(config); - - let actual = DAClientConfig::from_env().unwrap(); - assert_eq!( - actual, - DAClientConfig::Eigen(EigenConfig::MemStore(MemStoreConfig { - max_blob_size_bytes: 10, - blob_expiration: 20, - get_latency: 30, - put_latency: 40, - })) - ); - } - - #[test] - fn from_env_eigen_client_remote() { + fn from_env_eigen_client() { let mut lock = MUTEX.lock(); let config = r#" DA_CLIENT="Eigen" @@ -306,7 +266,7 @@ mod tests { let actual = DAClientConfig::from_env().unwrap(); assert_eq!( actual, - DAClientConfig::Eigen(EigenConfig::Disperser(DisperserConfig { + DAClientConfig::Eigen(EigenConfig { disperser_rpc: "http://localhost:8080".to_string(), eth_confirmation_depth: 0, eigenda_eth_rpc: "http://localhost:8545".to_string(), @@ -318,7 +278,7 @@ mod tests { authenticated: false, verify_cert: false, path_to_points: "resources".to_string(), - })) + }) ); } diff --git a/core/lib/protobuf_config/src/da_client.rs b/core/lib/protobuf_config/src/da_client.rs index 1ae882198cd3..02fb386d3762 100644 --- a/core/lib/protobuf_config/src/da_client.rs +++ b/core/lib/protobuf_config/src/da_client.rs @@ -4,7 +4,7 @@ use zksync_config::configs::{ da_client::{ avail::{AvailClientConfig, AvailConfig, AvailDefaultConfig, AvailGasRelayConfig}, celestia::CelestiaConfig, - eigen::{DisperserConfig, EigenConfig, MemStoreConfig}, + eigen::EigenConfig, DAClientConfig::{Avail, Celestia, Eigen, ObjectStore}, }, }; @@ -52,53 +52,31 @@ impl ProtoRepr for proto::DataAvailabilityClient { chain_id: required(&conf.chain_id).context("chain_id")?.clone(), timeout_ms: *required(&conf.timeout_ms).context("timeout_ms")?, }), - proto::data_availability_client::Config::Eigen(conf) => { - let config = required(&conf.config).context("config")?; - let eigen_config = match config { - proto::eigen_config::Config::MemStore(conf) => { - EigenConfig::MemStore(MemStoreConfig { - max_blob_size_bytes: *required(&conf.max_blob_size_bytes) - .context("max_blob_size_bytes")?, - blob_expiration: *required(&conf.blob_expiration) - .context("blob_expiration")?, - get_latency: *required(&conf.get_latency).context("get_latency")?, - put_latency: *required(&conf.put_latency).context("put_latency")?, - }) - } - proto::eigen_config::Config::Disperser(conf) => { - EigenConfig::Disperser(DisperserConfig { - disperser_rpc: required(&conf.disperser_rpc) - .context("disperser_rpc")? - .clone(), - eth_confirmation_depth: *required(&conf.eth_confirmation_depth) - .context("eth_confirmation_depth")?, - eigenda_eth_rpc: required(&conf.eigenda_eth_rpc) - .context("eigenda_eth_rpc")? - .clone(), - eigenda_svc_manager_address: required( - &conf.eigenda_svc_manager_address, - ) - .context("eigenda_svc_manager_address")? - .clone(), - blob_size_limit: *required(&conf.blob_size_limit) - .context("blob_size_limit")?, - status_query_timeout: *required(&conf.status_query_timeout) - .context("status_query_timeout")?, - status_query_interval: *required(&conf.status_query_interval) - .context("status_query_interval")?, - wait_for_finalization: *required(&conf.wait_for_finalization) - .context("wait_for_finalization")?, - authenticated: *required(&conf.authenticated) - .context("authenticated")?, - verify_cert: *required(&conf.verify_cert).context("verify_cert")?, - path_to_points: required(&conf.path_to_points) - .context("path_to_points")? - .clone(), - }) - } - }; - Eigen(eigen_config) - } + proto::data_availability_client::Config::Eigen(conf) => Eigen(EigenConfig { + disperser_rpc: required(&conf.disperser_rpc) + .context("disperser_rpc")? + .clone(), + eth_confirmation_depth: *required(&conf.eth_confirmation_depth) + .context("eth_confirmation_depth")?, + eigenda_eth_rpc: required(&conf.eigenda_eth_rpc) + .context("eigenda_eth_rpc")? + .clone(), + eigenda_svc_manager_address: required(&conf.eigenda_svc_manager_address) + .context("eigenda_svc_manager_address")? + .clone(), + blob_size_limit: *required(&conf.blob_size_limit).context("blob_size_limit")?, + status_query_timeout: *required(&conf.status_query_timeout) + .context("status_query_timeout")?, + status_query_interval: *required(&conf.status_query_interval) + .context("status_query_interval")?, + wait_for_finalization: *required(&conf.wait_for_finalization) + .context("wait_for_finalization")?, + authenticated: *required(&conf.authenticated).context("authenticated")?, + verify_cert: *required(&conf.verify_cert).context("verify_cert")?, + path_to_points: required(&conf.path_to_points) + .context("path_to_points")? + .clone(), + }), proto::data_availability_client::Config::ObjectStore(conf) => { ObjectStore(object_store_proto::ObjectStore::read(conf)?) } @@ -135,41 +113,19 @@ impl ProtoRepr for proto::DataAvailabilityClient { timeout_ms: Some(config.timeout_ms), }) } - Eigen(config) => match config { - EigenConfig::MemStore(config) => { - proto::data_availability_client::Config::Eigen(proto::EigenConfig { - config: Some(proto::eigen_config::Config::MemStore( - proto::MemStoreConfig { - max_blob_size_bytes: Some(config.max_blob_size_bytes), - blob_expiration: Some(config.blob_expiration), - get_latency: Some(config.get_latency), - put_latency: Some(config.put_latency), - }, - )), - }) - } - EigenConfig::Disperser(config) => { - proto::data_availability_client::Config::Eigen(proto::EigenConfig { - config: Some(proto::eigen_config::Config::Disperser( - proto::DisperserConfig { - disperser_rpc: Some(config.disperser_rpc.clone()), - eth_confirmation_depth: Some(config.eth_confirmation_depth), - eigenda_eth_rpc: Some(config.eigenda_eth_rpc.clone()), - eigenda_svc_manager_address: Some( - config.eigenda_svc_manager_address.clone(), - ), - blob_size_limit: Some(config.blob_size_limit), - status_query_timeout: Some(config.status_query_timeout), - status_query_interval: Some(config.status_query_interval), - wait_for_finalization: Some(config.wait_for_finalization), - authenticated: Some(config.authenticated), - verify_cert: Some(config.verify_cert), - path_to_points: Some(config.path_to_points.clone()), - }, - )), - }) - } - }, + Eigen(config) => proto::data_availability_client::Config::Eigen(proto::EigenConfig { + disperser_rpc: Some(config.disperser_rpc.clone()), + eth_confirmation_depth: Some(config.eth_confirmation_depth), + eigenda_eth_rpc: Some(config.eigenda_eth_rpc.clone()), + eigenda_svc_manager_address: Some(config.eigenda_svc_manager_address.clone()), + blob_size_limit: Some(config.blob_size_limit), + status_query_timeout: Some(config.status_query_timeout), + status_query_interval: Some(config.status_query_interval), + wait_for_finalization: Some(config.wait_for_finalization), + authenticated: Some(config.authenticated), + verify_cert: Some(config.verify_cert), + path_to_points: Some(config.path_to_points.clone()), + }), ObjectStore(config) => proto::data_availability_client::Config::ObjectStore( object_store_proto::ObjectStore::build(config), ), diff --git a/core/lib/protobuf_config/src/proto/config/da_client.proto b/core/lib/protobuf_config/src/proto/config/da_client.proto index a143d043ebe5..e8a82365a7ff 100644 --- a/core/lib/protobuf_config/src/proto/config/da_client.proto +++ b/core/lib/protobuf_config/src/proto/config/da_client.proto @@ -36,14 +36,7 @@ message CelestiaConfig { optional uint64 timeout_ms = 4; } -message MemStoreConfig { - optional uint64 max_blob_size_bytes = 3; - optional uint64 blob_expiration = 4; - optional uint64 get_latency = 5; - optional uint64 put_latency = 6; -} - -message DisperserConfig { +message EigenConfig { optional string disperser_rpc = 3; optional int32 eth_confirmation_depth = 4; optional string eigenda_eth_rpc = 5; @@ -57,13 +50,6 @@ message DisperserConfig { optional string path_to_points = 13; } -message EigenConfig { - oneof config { - MemStoreConfig mem_store = 1; - DisperserConfig disperser = 2; - } -} - message DataAvailabilityClient { // oneof in protobuf allows for None oneof config { diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 44ca7355ad47..7a463cd17f86 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -10,7 +10,7 @@ use zksync_da_client::{ DataAvailabilityClient, }; -use super::{blob_info::BlobInfo, memstore::MemStore, sdk::RawEigenClient, Disperser}; +use super::{blob_info::BlobInfo, sdk::RawEigenClient}; use crate::utils::to_non_retriable_da_error; /// EigenClient is a client for the Eigen DA service. @@ -19,7 +19,7 @@ use crate::utils::to_non_retriable_da_error; /// - Memstore: Stores blobs in memory, used for testing purposes. #[derive(Debug, Clone)] pub struct EigenClient { - client: Disperser, + client: Arc, } impl EigenClient { @@ -27,14 +27,10 @@ impl EigenClient { let private_key = SecretKey::from_str(secrets.private_key.0.expose_secret().as_str()) .map_err(|e| anyhow::anyhow!("Failed to parse private key: {}", e))?; - let disperser: Disperser = match config.clone() { - EigenConfig::Disperser(config) => { - let client = RawEigenClient::new(private_key, config).await?; - Disperser::Remote(Arc::new(client)) - } - EigenConfig::MemStore(config) => Disperser::Memory(MemStore::new(config)), - }; - Ok(Self { client: disperser }) + let client = RawEigenClient::new(private_key, config).await?; + Ok(Self { + client: Arc::new(client), + }) } } @@ -54,17 +50,11 @@ impl DataAvailabilityClient for EigenClient { } } - let blob_id = match &self.client { - Disperser::Remote(remote_disperser) => remote_disperser - .dispatch_blob(data) - .await - .map_err(to_non_retriable_da_error)?, - Disperser::Memory(memstore) => memstore - .clone() - .put_blob(data) - .await - .map_err(to_non_retriable_da_error)?, - }; + let blob_id = self + .client + .dispatch_blob(data) + .await + .map_err(to_non_retriable_da_error)?; Ok(DispatchResponse::from(blob_id)) } @@ -89,28 +79,19 @@ impl DataAvailabilityClient for EigenClient { } fn blob_size_limit(&self) -> Option { - match self.client.clone() { - Disperser::Memory(mem_store) => Some(mem_store.config.max_blob_size_bytes as usize), - Disperser::Remote(raw_eigen_client) => { - Some(raw_eigen_client.config.blob_size_limit as usize) - } - } + Some(self.client.clone().config.blob_size_limit as usize) } } #[cfg(test)] impl EigenClient { pub async fn get_blob_data(&self, blob_id: &str) -> anyhow::Result>, DAError> { - match &self.client { - Disperser::Remote(remote_client) => remote_client.get_blob_data(blob_id).await, - Disperser::Memory(memstore) => memstore.clone().get_blob_data(blob_id).await, - } + self.client.get_blob_data(blob_id).await } } #[cfg(test)] mod tests { use serial_test::serial; - use zksync_config::configs::da_client::eigen::{DisperserConfig, MemStoreConfig}; use zksync_types::secrets::PrivateKey; use super::*; @@ -119,7 +100,7 @@ mod tests { #[tokio::test] #[serial] async fn test_non_auth_dispersal() { - let config = EigenConfig::Disperser(DisperserConfig { + let config = EigenConfig { disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), eth_confirmation_depth: -1, eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(), @@ -131,7 +112,7 @@ mod tests { authenticated: false, verify_cert: true, path_to_points: "../../../resources".to_string(), - }); + }; let secrets = EigenSecrets { private_key: PrivateKey::from_str( "d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6", @@ -158,7 +139,7 @@ mod tests { #[tokio::test] #[serial] async fn test_auth_dispersal() { - let config = EigenConfig::Disperser(DisperserConfig { + let config = EigenConfig { disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), eth_confirmation_depth: -1, eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(), @@ -170,7 +151,7 @@ mod tests { authenticated: true, verify_cert: true, path_to_points: "../../../resources".to_string(), - }); + }; let secrets = EigenSecrets { private_key: PrivateKey::from_str( "d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6", @@ -194,42 +175,10 @@ mod tests { assert_eq!(retrieved_data.unwrap(), data); } - #[tokio::test] - async fn test_eigenda_memory_disperser() { - let config = EigenConfig::MemStore(MemStoreConfig { - max_blob_size_bytes: 2 * 1024 * 1024, // 2MB, - blob_expiration: 60 * 2, - get_latency: 0, - put_latency: 0, - }); - let secrets = EigenSecrets { - private_key: PrivateKey::from_str( - "d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6", - ) - .unwrap(), - }; - let client = EigenClient::new(config, secrets).await.unwrap(); - let data = vec![1u8; 100]; - let result = client.dispatch_blob(0, data.clone()).await.unwrap(); - - let blob_info: BlobInfo = - rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); - let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof; - let actual_inclusion_data = client - .get_inclusion_data(&result.blob_id) - .await - .unwrap() - .unwrap() - .data; - assert_eq!(expected_inclusion_data, actual_inclusion_data); - - let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); - assert_eq!(retrieved_data.unwrap(), data); - } #[tokio::test] #[serial] async fn test_wait_for_finalization() { - let config = EigenConfig::Disperser(DisperserConfig { + let config = EigenConfig { disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), blob_size_limit: 2 * 1024 * 1024, // 2MB status_query_timeout: 1800000, // 30 minutes @@ -241,7 +190,7 @@ mod tests { eth_confirmation_depth: 0, eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(), eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(), - }); + }; let secrets = EigenSecrets { private_key: PrivateKey::from_str( "d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6", @@ -267,12 +216,19 @@ mod tests { #[tokio::test] async fn test_eigenda_dispatch_blob_too_large() { - let config = EigenConfig::MemStore(MemStoreConfig { - max_blob_size_bytes: 99, - blob_expiration: 60 * 2, - get_latency: 0, - put_latency: 0, - }); + let config = EigenConfig { + disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), + blob_size_limit: 99, + status_query_timeout: 1800000, // 30 minutes + status_query_interval: 5000, // 5000 ms + wait_for_finalization: true, + authenticated: true, + verify_cert: true, + path_to_points: "../../../resources".to_string(), + eth_confirmation_depth: 0, + eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(), + eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(), + }; let secrets = EigenSecrets { private_key: PrivateKey::from_str( "d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6", @@ -294,7 +250,7 @@ mod tests { #[tokio::test] #[serial] async fn test_eth_confirmation_depth() { - let config = EigenConfig::Disperser(DisperserConfig { + let config = EigenConfig { disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), eth_confirmation_depth: 5, eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(), @@ -306,7 +262,7 @@ mod tests { authenticated: false, verify_cert: true, path_to_points: "../../../resources".to_string(), - }); + }; let secrets = EigenSecrets { private_key: PrivateKey::from_str( "d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6", @@ -333,7 +289,7 @@ mod tests { #[tokio::test] #[serial] async fn test_auth_dispersal_eth_confirmation_depth() { - let config = EigenConfig::Disperser(DisperserConfig { + let config = EigenConfig { disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), eth_confirmation_depth: 5, eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(), @@ -345,7 +301,7 @@ mod tests { authenticated: true, verify_cert: true, path_to_points: "../../../resources".to_string(), - }); + }; let secrets = EigenSecrets { private_key: PrivateKey::from_str( "d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6", diff --git a/core/node/da_clients/src/eigen/eigenda-integration.md b/core/node/da_clients/src/eigen/eigenda-integration.md index ff3957af7251..71de73482bb3 100644 --- a/core/node/da_clients/src/eigen/eigenda-integration.md +++ b/core/node/da_clients/src/eigen/eigenda-integration.md @@ -17,35 +17,20 @@ Changes needed both for local and mainnet/testnet setup. 1. Add `da_client` to `etc/env/file_based/general.yaml`: -If you want to use memstore: - -```yaml -da_client: - eigen: - mem_store: - max_blob_size_bytes: 2097152 - blob_expiration: 100000 - get_latency: 100 - put_latency: 100 -``` - -If you want to use disperser: - ```yaml da_client: eigen: - disperser: - disperser_rpc: - eth_confirmation_depth: -1 - eigenda_eth_rpc: - eigenda_svc_manager_address: '0xD4A7E1Bd8015057293f0D0A557088c286942e84b' - blob_size_limit: 2097152 - status_query_timeout: 1800000 # ms - status_query_interval: 5 # ms - wait_for_finalization: false - authenticated: false - verify_cert: true - path_to_points: ./resources + disperser_rpc: + eth_confirmation_depth: -1 + eigenda_eth_rpc: + eigenda_svc_manager_address: '0xD4A7E1Bd8015057293f0D0A557088c286942e84b' + blob_size_limit: 2097152 + status_query_timeout: 1800000 # ms + status_query_interval: 5 # ms + wait_for_finalization: false + authenticated: false + verify_cert: true + path_to_points: ./resources ``` Also set the private key in `etc/env/file_based/secrets.yaml`: diff --git a/core/node/da_clients/src/eigen/memstore.rs b/core/node/da_clients/src/eigen/memstore.rs deleted file mode 100644 index 8a10137a2b93..000000000000 --- a/core/node/da_clients/src/eigen/memstore.rs +++ /dev/null @@ -1,221 +0,0 @@ -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - time::{Duration, Instant}, -}; - -use anyhow::Error; -use rand::{rngs::OsRng, Rng, RngCore}; -use sha3::{Digest, Keccak256}; -use tokio::time::interval; -use zksync_config::configs::da_client::eigen::MemStoreConfig; -use zksync_da_client::types::{DAError, InclusionData}; - -use super::blob_info::{self, BlobInfo}; - -#[derive(Debug, PartialEq)] -pub enum MemStoreError { - BlobToLarge, - BlobAlreadyExists, - IncorrectCommitment, - #[cfg(test)] - BlobNotFound, -} - -impl From for Error { - fn from(val: MemStoreError) -> Self { - match val { - MemStoreError::BlobToLarge => Error::msg("Blob too large"), - MemStoreError::BlobAlreadyExists => Error::msg("Blob already exists"), - MemStoreError::IncorrectCommitment => Error::msg("Incorrect commitment"), - #[cfg(test)] - MemStoreError::BlobNotFound => Error::msg("Blob not found"), - } - } -} - -#[derive(Debug)] -struct MemStoreData { - store: HashMap>, - key_starts: HashMap, -} - -/// This struct represents a memory store for blobs. -/// It should be used for testing purposes only. -#[derive(Clone, Debug)] -pub struct MemStore { - pub config: MemStoreConfig, - data: Arc>, -} - -impl MemStore { - pub fn new(config: MemStoreConfig) -> Arc { - let memstore = Arc::new(Self { - config, - data: Arc::new(RwLock::new(MemStoreData { - store: HashMap::new(), - key_starts: HashMap::new(), - })), - }); - let store_clone = Arc::clone(&memstore); - tokio::spawn(async move { - store_clone.pruning_loop().await; - }); - memstore - } - - /// Saves a blob to the memory store, it harcodes the blob info, since we don't care about it in a memory based store - pub async fn put_blob(self: Arc, value: Vec) -> Result { - tokio::time::sleep(Duration::from_millis(self.config.put_latency)).await; - if value.len() as u64 > self.config.max_blob_size_bytes { - return Err(MemStoreError::BlobToLarge); - } - - let mut entropy = [0u8; 10]; - OsRng.fill_bytes(&mut entropy); - - let mut hasher = Keccak256::new(); - hasher.update(entropy); - let mock_batch_root = hasher.finalize().to_vec(); - - let block_num = OsRng.gen_range(0u32..1000); - - let blob_info = blob_info::BlobInfo { - blob_header: blob_info::BlobHeader { - commitment: blob_info::G1Commitment { - // todo: generate real commitment - x: vec![0u8; 32], - y: vec![0u8; 32], - }, - data_length: value.len() as u32, - blob_quorum_params: vec![blob_info::BlobQuorumParam { - quorum_number: 1, - adversary_threshold_percentage: 29, - confirmation_threshold_percentage: 30, - chunk_length: 300, - }], - }, - blob_verification_proof: blob_info::BlobVerificationProof { - batch_medatada: blob_info::BatchMetadata { - batch_header: blob_info::BatchHeader { - batch_root: mock_batch_root.clone(), - quorum_numbers: vec![0x1, 0x0], - quorum_signed_percentages: vec![0x60, 0x90], - reference_block_number: block_num, - }, - signatory_record_hash: mock_batch_root, - fee: vec![], - confirmation_block_number: block_num, - batch_header_hash: vec![], - }, - batch_id: 69, - blob_index: 420, - inclusion_proof: entropy.to_vec(), - quorum_indexes: vec![0x1, 0x0], - }, - }; - - let cert_bytes = rlp::encode(&blob_info).to_vec(); - - let key = String::from_utf8_lossy( - blob_info - .blob_verification_proof - .inclusion_proof - .clone() - .as_slice(), - ) - .to_string(); - - let mut data = self.data.write().unwrap(); - - if data.store.contains_key(key.as_str()) { - return Err(MemStoreError::BlobAlreadyExists); - } - - data.key_starts.insert(key.clone(), Instant::now()); - data.store.insert(key, value); - Ok(hex::encode(cert_bytes)) - } - - /// It returns the inclusion proof - pub async fn get_inclusion_data( - self: Arc, - blob_id: &str, - ) -> anyhow::Result, DAError> { - let rlp_encoded_bytes = hex::decode(blob_id).map_err(|_| DAError { - error: MemStoreError::IncorrectCommitment.into(), - is_retriable: false, - })?; - let blob_info: BlobInfo = rlp::decode(&rlp_encoded_bytes).map_err(|_| DAError { - error: MemStoreError::IncorrectCommitment.into(), - is_retriable: false, - })?; - let inclusion_data = blob_info.blob_verification_proof.inclusion_proof; - Ok(Some(InclusionData { - data: inclusion_data, - })) - } - - /// This function is only used on tests, it returns the blob data - #[cfg(test)] - pub async fn get_blob_data( - self: Arc, - blob_id: &str, - ) -> anyhow::Result>, DAError> { - tokio::time::sleep(Duration::from_millis(self.config.get_latency)).await; - let request_id = hex::decode(blob_id).map_err(|_| DAError { - error: MemStoreError::IncorrectCommitment.into(), - is_retriable: false, - })?; - let blob_info: BlobInfo = rlp::decode(&request_id).map_err(|_| DAError { - error: MemStoreError::IncorrectCommitment.into(), - is_retriable: false, - })?; - let key = String::from_utf8_lossy( - blob_info - .blob_verification_proof - .inclusion_proof - .clone() - .as_slice(), - ) - .to_string(); - - let data = self.data.read().map_err(|_| DAError { - error: MemStoreError::BlobNotFound.into(), - is_retriable: false, - })?; - match data.store.get(&key) { - Some(value) => Ok(Some(value.clone())), - None => Err(DAError { - error: MemStoreError::BlobNotFound.into(), - is_retriable: false, - }), - } - } - - /// After some time has passed, blobs are removed from the store - async fn prune_expired(self: Arc) { - let mut data = self.data.write().unwrap(); - let mut to_remove = vec![]; - for (key, start) in data.key_starts.iter() { - if start.elapsed() > Duration::from_secs(self.config.blob_expiration) { - to_remove.push(key.clone()); - } - } - for key in to_remove { - data.store.remove(&key); - data.key_starts.remove(&key); - } - } - - /// Loop used to prune expired blobs - async fn pruning_loop(self: Arc) { - let mut interval = interval(Duration::from_secs(self.config.blob_expiration)); - - loop { - interval.tick().await; - let self_clone = Arc::clone(&self); - self_clone.prune_expired().await; - } - } -} diff --git a/core/node/da_clients/src/eigen/mod.rs b/core/node/da_clients/src/eigen/mod.rs index 02ec82a70815..9446194f48c7 100644 --- a/core/node/da_clients/src/eigen/mod.rs +++ b/core/node/da_clients/src/eigen/mod.rs @@ -1,15 +1,9 @@ mod blob_info; mod client; mod generated; -mod memstore; mod sdk; mod verifier; -use std::sync::Arc; - -use memstore::MemStore; -use sdk::RawEigenClient; - pub use self::client::EigenClient; #[allow(clippy::all)] @@ -21,9 +15,3 @@ pub(crate) mod disperser { pub(crate) mod common { include!("generated/common.rs"); } - -#[derive(Clone, Debug)] -pub(crate) enum Disperser { - Remote(Arc), - Memory(Arc), -} diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index ef3175b39c5d..d4ea5ada846b 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -8,7 +8,7 @@ use tonic::{ transport::{Channel, ClientTlsConfig, Endpoint}, Streaming, }; -use zksync_config::configs::da_client::eigen::DisperserConfig; +use zksync_config::EigenConfig; #[cfg(test)] use zksync_da_client::types::DAError; @@ -31,7 +31,7 @@ use crate::eigen::{ pub(crate) struct RawEigenClient { client: DisperserClient, private_key: SecretKey, - pub config: DisperserConfig, + pub config: EigenConfig, verifier: Verifier, } @@ -41,7 +41,7 @@ pub(crate) const AVG_BLOCK_TIME: u64 = 12; impl RawEigenClient { pub(crate) const BUFFER_SIZE: usize = 1000; - pub async fn new(private_key: SecretKey, config: DisperserConfig) -> anyhow::Result { + pub async fn new(private_key: SecretKey, config: EigenConfig) -> anyhow::Result { let endpoint = Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?; let client = DisperserClient::connect(endpoint)