Skip to content

Commit

Permalink
feat(eigen-client-extra-features): Fix PR comments (#369)
Browse files Browse the repository at this point in the history
* Add envy load

* Readd proto reference

* Rename blob id to request id

* Make literals constants

* Make point size constant

* Get pool unique

* Remaining comments

* Fix comment

* Add check for failed states

* Change l1 name
  • Loading branch information
gianbelinche authored Dec 11, 2024
1 parent 83c3e13 commit fe8f68c
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 67 deletions.
6 changes: 5 additions & 1 deletion core/bin/zksync_server/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ impl MainNodeBuilder {
};

let secrets = try_load_config!(self.secrets.data_availability);
let l1_secrets = try_load_config!(self.secrets.l1);
match (da_client_config, secrets) {
(DAClientConfig::Avail(config), DataAvailabilitySecrets::Avail(secret)) => {
self.node.add_layer(AvailWiringLayer::new(config, secret));
Expand All @@ -535,7 +536,10 @@ impl MainNodeBuilder {
.add_layer(CelestiaWiringLayer::new(config, secret));
}

(DAClientConfig::Eigen(config), DataAvailabilitySecrets::Eigen(secret)) => {
(DAClientConfig::Eigen(mut config), DataAvailabilitySecrets::Eigen(secret)) => {
if config.eigenda_eth_rpc.is_none() {
config.eigenda_eth_rpc = Some(l1_secrets.l1_rpc_url.expose_str().to_string());
}
self.node.add_layer(EigenWiringLayer::new(config, secret));
}

Expand Down
2 changes: 1 addition & 1 deletion core/lib/config/src/configs/da_client/eigen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct EigenConfig {
/// a value less or equal to 0 means that the disperser will not wait for finalization
pub settlement_layer_confirmation_depth: i32,
/// URL of the Ethereum RPC server
pub eigenda_eth_rpc: String,
pub eigenda_eth_rpc: Option<String>,
/// Address of the service manager contract
pub eigenda_svc_manager_address: String,
/// Wait for the blob to be finalized before returning the response
Expand Down
59 changes: 21 additions & 38 deletions core/lib/env_config/src/da_client.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
use std::env;

use zksync_config::{
configs::{
da_client::{
avail::{
AvailClientConfig, AvailSecrets, AVAIL_FULL_CLIENT_NAME,
AVAIL_GAS_RELAY_CLIENT_NAME,
},
celestia::CelestiaSecrets,
eigen::EigenSecrets,
DAClientConfig, AVAIL_CLIENT_CONFIG_NAME, CELESTIA_CLIENT_CONFIG_NAME,
EIGEN_CLIENT_CONFIG_NAME, OBJECT_STORE_CLIENT_CONFIG_NAME,
use zksync_config::configs::{
da_client::{
avail::{
AvailClientConfig, AvailSecrets, AVAIL_FULL_CLIENT_NAME, AVAIL_GAS_RELAY_CLIENT_NAME,
},
secrets::DataAvailabilitySecrets,
AvailConfig,
celestia::CelestiaSecrets,
eigen::EigenSecrets,
DAClientConfig, AVAIL_CLIENT_CONFIG_NAME, CELESTIA_CLIENT_CONFIG_NAME,
EIGEN_CLIENT_CONFIG_NAME, OBJECT_STORE_CLIENT_CONFIG_NAME,
},
EigenConfig,
secrets::DataAvailabilitySecrets,
AvailConfig,
};

use crate::{envy_load, FromEnv};
Expand All @@ -38,20 +34,7 @@ impl FromEnv for DAClientConfig {
},
}),
CELESTIA_CLIENT_CONFIG_NAME => Self::Celestia(envy_load("da_celestia_config", "DA_")?),
EIGEN_CLIENT_CONFIG_NAME => Self::Eigen(EigenConfig {
disperser_rpc: env::var("EIGENDA_DISPERSER_RPC")?,
settlement_layer_confirmation_depth: env::var(
"EIGENDA_SETTLEMENT_LAYER_CONFIRMATION_DEPTH",
)?
.parse()?,
eigenda_eth_rpc: env::var("EIGENDA_EIGENDA_ETH_RPC")?,
eigenda_svc_manager_address: env::var("EIGENDA_EIGENDA_SVC_MANAGER_ADDRESS")?,
wait_for_finalization: env::var("EIGENDA_WAIT_FOR_FINALIZATION")?.parse()?,
authenticated: env::var("EIGENDA_AUTHENTICATED")?.parse()?,
g1_url: env::var("EIGENDA_G1_URL")?.parse()?,
g2_url: env::var("EIGENDA_G2_URL")?.parse()?,
chain_id: env::var("EIGENDA_CHAIN_ID")?.parse()?,
}),
EIGEN_CLIENT_CONFIG_NAME => Self::Eigen(envy_load("da_eigen_config", "DA_")?),
OBJECT_STORE_CLIENT_CONFIG_NAME => {
Self::ObjectStore(envy_load("da_object_store", "DA_")?)
}
Expand Down Expand Up @@ -265,15 +248,15 @@ mod tests {
let mut lock = MUTEX.lock();
let config = r#"
DA_CLIENT="Eigen"
EIGENDA_DISPERSER_RPC="http://localhost:8080"
EIGENDA_SETTLEMENT_LAYER_CONFIRMATION_DEPTH=0
EIGENDA_EIGENDA_ETH_RPC="http://localhost:8545"
EIGENDA_EIGENDA_SVC_MANAGER_ADDRESS="0x123"
EIGENDA_WAIT_FOR_FINALIZATION=true
EIGENDA_AUTHENTICATED=false
EIGENDA_G1_URL="resources1"
EIGENDA_G2_URL="resources2"
EIGENDA_CHAIN_ID=1
DA_DISPERSER_RPC="http://localhost:8080"
DA_SETTLEMENT_LAYER_CONFIRMATION_DEPTH=0
DA_EIGENDA_ETH_RPC="http://localhost:8545"
DA_EIGENDA_SVC_MANAGER_ADDRESS="0x123"
DA_WAIT_FOR_FINALIZATION=true
DA_AUTHENTICATED=false
DA_G1_URL="resources1"
DA_G2_URL="resources2"
DA_CHAIN_ID=1
"#;
lock.set_env(config);

Expand All @@ -283,7 +266,7 @@ mod tests {
DAClientConfig::Eigen(EigenConfig {
disperser_rpc: "http://localhost:8080".to_string(),
settlement_layer_confirmation_depth: 0,
eigenda_eth_rpc: "http://localhost:8545".to_string(),
eigenda_eth_rpc: Some("http://localhost:8545".to_string()),
eigenda_svc_manager_address: "0x123".to_string(),
wait_for_finalization: true,
authenticated: false,
Expand Down
9 changes: 5 additions & 4 deletions core/lib/protobuf_config/src/da_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ impl ProtoRepr for proto::DataAvailabilityClient {
&conf.settlement_layer_confirmation_depth,
)
.context("settlement_layer_confirmation_depth")?,
eigenda_eth_rpc: required(&conf.eigenda_eth_rpc)
.context("eigenda_eth_rpc")?
.clone(),
eigenda_eth_rpc: match required(&conf.eigenda_eth_rpc) {
Ok(rpc) => Some(rpc.clone()),
Err(_) => None,
},
eigenda_svc_manager_address: required(&conf.eigenda_svc_manager_address)
.context("eigenda_svc_manager_address")?
.clone(),
Expand Down Expand Up @@ -117,7 +118,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
settlement_layer_confirmation_depth: Some(
config.settlement_layer_confirmation_depth,
),
eigenda_eth_rpc: Some(config.eigenda_eth_rpc.clone()),
eigenda_eth_rpc: config.eigenda_eth_rpc.clone(),
eigenda_svc_manager_address: Some(config.eigenda_svc_manager_address.clone()),
wait_for_finalization: Some(config.wait_for_finalization),
authenticated: Some(config.authenticated),
Expand Down
3 changes: 3 additions & 0 deletions core/node/da_clients/src/eigen/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ pub fn compile_protos() {

The generated folder is considered a temporary solution until the EigenDA has a library with either a protogen, or
preferably a full Rust client implementation.

proto files are not included here to not create confusion in case they are not updated in time, so the EigenDA
[repo](https://github.com/Layr-Labs/eigenda/tree/master/api/proto) has to be a source of truth for the proto files.
10 changes: 5 additions & 5 deletions core/node/da_clients/src/eigen/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mod tests {
let config = EigenConfig {
disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(),
settlement_layer_confirmation_depth: -1,
eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
eigenda_eth_rpc: Some("https://ethereum-holesky-rpc.publicnode.com".to_string()),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
wait_for_finalization: false,
authenticated: false,
Expand Down Expand Up @@ -111,7 +111,7 @@ mod tests {
let config = EigenConfig {
disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(),
settlement_layer_confirmation_depth: -1,
eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
eigenda_eth_rpc: Some("https://ethereum-holesky-rpc.publicnode.com".to_string()),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
wait_for_finalization: false,
authenticated: true,
Expand Down Expand Up @@ -155,7 +155,7 @@ mod tests {
g1_url: "https://github.com/Layr-Labs/eigenda-proxy/raw/2fd70b99ef5bf137d7bbca3461cf9e1f2c899451/resources/g1.point".to_string(),
g2_url: "https://github.com/Layr-Labs/eigenda-proxy/raw/2fd70b99ef5bf137d7bbca3461cf9e1f2c899451/resources/g2.point.powerOf2".to_string(),
settlement_layer_confirmation_depth: 0,
eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
eigenda_eth_rpc: Some("https://ethereum-holesky-rpc.publicnode.com".to_string()),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
chain_id: 17000,
};
Expand Down Expand Up @@ -191,7 +191,7 @@ mod tests {
let config = EigenConfig {
disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(),
settlement_layer_confirmation_depth: 5,
eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
eigenda_eth_rpc: Some("https://ethereum-holesky-rpc.publicnode.com".to_string()),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
wait_for_finalization: false,
authenticated: false,
Expand Down Expand Up @@ -231,7 +231,7 @@ mod tests {
let config = EigenConfig {
disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(),
settlement_layer_confirmation_depth: 5,
eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
eigenda_eth_rpc: Some("https://ethereum-holesky-rpc.publicnode.com".to_string()),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
wait_for_finalization: false,
authenticated: true,
Expand Down
42 changes: 31 additions & 11 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::eigen::{
disperser_client::DisperserClient,
AuthenticatedReply, BlobAuthHeader,
},
verifier::VerificationError,
};

#[derive(Debug, Clone)]
Expand All @@ -53,7 +54,10 @@ impl<T: GetBlobData> RawEigenClient<T> {
let client = Arc::new(Mutex::new(DisperserClient::connect(endpoint).await?));

let verifier_config = VerifierConfig {
rpc_url: config.eigenda_eth_rpc.clone(),
rpc_url: config
.eigenda_eth_rpc
.clone()
.ok_or(anyhow::anyhow!("EigenDA ETH RPC not set"))?,
svc_manager_addr: config.eigenda_svc_manager_address.clone(),
max_blob_size: Self::BLOB_SIZE_LIMIT as u32,
g1_url: config.g1_url.clone(),
Expand Down Expand Up @@ -109,7 +113,13 @@ impl<T: GetBlobData> RawEigenClient<T> {
.await?
.into_inner();

Ok(hex::encode(disperse_reply.request_id))
match disperser::BlobStatus::try_from(disperse_reply.result)? {
disperser::BlobStatus::Failed
| disperser::BlobStatus::InsufficientSignatures
| disperser::BlobStatus::Unknown => Err(anyhow::anyhow!("Blob dispatch failed")),

_ => Ok(hex::encode(disperse_reply.request_id)),
}
}

async fn dispatch_blob_authenticated(&self, data: Vec<u8>) -> anyhow::Result<String> {
Expand Down Expand Up @@ -147,11 +157,18 @@ impl<T: GetBlobData> RawEigenClient<T> {
let disperser::authenticated_reply::Payload::DisperseReply(disperse_reply) = reply else {
return Err(anyhow::anyhow!("Unexpected response from server"));
};
Ok(hex::encode(disperse_reply.request_id))

match disperser::BlobStatus::try_from(disperse_reply.result)? {
disperser::BlobStatus::Failed
| disperser::BlobStatus::InsufficientSignatures
| disperser::BlobStatus::Unknown => Err(anyhow::anyhow!("Blob dispatch failed")),

_ => Ok(hex::encode(disperse_reply.request_id)),
}
}

pub async fn get_commitment(&self, blob_id: &str) -> anyhow::Result<Option<BlobInfo>> {
let blob_info = self.try_get_inclusion_data(blob_id.to_string()).await?;
pub async fn get_commitment(&self, request_id: &str) -> anyhow::Result<Option<BlobInfo>> {
let blob_info = self.try_get_inclusion_data(request_id.to_string()).await?;

let Some(blob_info) = blob_info else {
return Ok(None);
Expand All @@ -162,7 +179,7 @@ impl<T: GetBlobData> RawEigenClient<T> {
let Some(data) = self.get_blob_data(blob_info.clone()).await? else {
return Err(anyhow::anyhow!("Failed to get blob data"));
};
let data_db = self.get_blob_data.call(blob_id).await?;
let data_db = self.get_blob_data.call(request_id).await?;
if let Some(data_db) = data_db {
if data_db != data {
return Err(anyhow::anyhow!(
Expand All @@ -179,16 +196,19 @@ impl<T: GetBlobData> RawEigenClient<T> {
.verify_inclusion_data_against_settlement_layer(blob_info.clone())
.await;
// in case of an error, the dispatcher will retry, so the need to return None
if result.is_err() {
return Ok(None);
if let Err(e) = result {
match e {
VerificationError::EmptyHash => return Ok(None),
_ => return Err(anyhow::anyhow!("Failed to verify inclusion data: {:?}", e)),
}
}

tracing::info!("Blob dispatch confirmed, blob id: {}", blob_id);
tracing::info!("Blob dispatch confirmed, request id: {}", request_id);
Ok(Some(blob_info))
}

pub async fn get_inclusion_data(&self, blob_id: &str) -> anyhow::Result<Option<Vec<u8>>> {
let blob_info = self.get_commitment(blob_id).await?;
pub async fn get_inclusion_data(&self, request_id: &str) -> anyhow::Result<Option<Vec<u8>>> {
let blob_info = self.get_commitment(request_id).await?;
if let Some(blob_info) = blob_info {
Ok(Some(blob_info.blob_verification_proof.inclusion_proof))
} else {
Expand Down
13 changes: 8 additions & 5 deletions core/node/da_clients/src/eigen/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ impl Clone for Verifier {
impl Verifier {
pub const DEFAULT_PRIORITY_FEE_PER_GAS: u64 = 100;
pub const SRSORDER: u32 = 268435456; // 2 ^ 28
pub const G1POINT: &'static str = "g1.point";
pub const G2POINT: &'static str = "g2.point.powerOf2";
pub const POINT_SIZE: u32 = 32;

async fn save_point(url: String, point: String) -> Result<(), VerificationError> {
let url = Url::parse(&url).map_err(|_| VerificationError::LinkError)?;
Expand All @@ -120,21 +123,21 @@ impl Verifier {
Ok(())
}
async fn save_points(url_g1: String, url_g2: String) -> Result<String, VerificationError> {
Self::save_point(url_g1.clone(), "g1.point".to_string()).await?;
Self::save_point(url_g2.clone(), "g2.point.powerOf2".to_string()).await?;
Self::save_point(url_g1.clone(), Self::G1POINT.to_string()).await?;
Self::save_point(url_g2.clone(), Self::G2POINT.to_string()).await?;

Ok(".".to_string())
}
pub async fn new<T: VerifierClient + 'static>(
cfg: VerifierConfig,
signing_client: T,
) -> Result<Self, VerificationError> {
let srs_points_to_load = cfg.max_blob_size / 32;
let srs_points_to_load = cfg.max_blob_size / Self::POINT_SIZE;
let path = Self::save_points(cfg.clone().g1_url, cfg.clone().g2_url).await?;
let kzg = Kzg::setup(
&format!("{}{}", path, "/g1.point"),
&format!("{}/{}", path, Self::G1POINT),
"",
&format!("{}{}", path, "/g2.point.powerOf2"),
&format!("{}/{}", path, Self::G2POINT),
Self::SRSORDER,
srs_points_to_load,
"".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl WiringLayer for EigenWiringLayer {
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let master_pool = input.master_pool.get_custom(2).await?;
let master_pool = input.master_pool.get().await?;
let get_blob_from_db = GetBlobFromDB { pool: master_pool };
let client: Box<dyn DataAvailabilityClient> = Box::new(
EigenClient::new(self.config, self.secrets, Box::new(get_blob_from_db)).await?,
Expand All @@ -69,7 +69,7 @@ impl GetBlobData for GetBlobFromDB {
async fn call(&self, input: &'_ str) -> anyhow::Result<Option<Vec<u8>>> {
let pool = self.pool.clone();
let input = input.to_string();
let mut conn = pool.connection_tagged("da_dispatcher").await?;
let mut conn = pool.connection_tagged("eigen_client").await?;
let batch = conn
.data_availability_dal()
.get_blob_data_by_blob_id(&input)
Expand Down

0 comments on commit fe8f68c

Please sign in to comment.