Skip to content

Commit

Permalink
feat(Base token): add cbt metrics (#2720)
Browse files Browse the repository at this point in the history
* Add cbt-related metrics;
* Move last hardcoded cbt-related properties to the config.
  • Loading branch information
ischasny authored Aug 23, 2024
1 parent d9266e5 commit 58438eb
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 58 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions core/lib/config/src/configs/base_token_adjuster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ const DEFAULT_L1_TX_SENDING_MAX_ATTEMPTS: u32 = 3;
/// Default number of milliseconds to sleep between receipt checking attempts
const DEFAULT_L1_RECEIPT_CHECKING_SLEEP_MS: u64 = 30_000;

/// Default maximum number of attempts to fetch price from a remote API
const DEFAULT_PRICE_FETCHING_MAX_ATTEMPTS: u32 = 3;

/// Default number of milliseconds to sleep between price fetching attempts
const DEFAULT_PRICE_FETCHING_SLEEP_MS: u64 = 5_000;

/// Default number of milliseconds to sleep between transaction sending attempts
const DEFAULT_L1_TX_SENDING_SLEEP_MS: u64 = 30_000;

Expand Down Expand Up @@ -73,6 +79,14 @@ pub struct BaseTokenAdjusterConfig {
#[serde(default = "BaseTokenAdjusterConfig::default_l1_tx_sending_sleep_ms")]
pub l1_tx_sending_sleep_ms: u64,

/// Maximum number of attempts to fetch quote from a remote API before failing over
#[serde(default = "BaseTokenAdjusterConfig::default_price_fetching_max_attempts")]
pub price_fetching_max_attempts: u32,

/// Number of seconds to sleep between price fetching attempts
#[serde(default = "BaseTokenAdjusterConfig::default_price_fetching_sleep_ms")]
pub price_fetching_sleep_ms: u64,

/// Defines whether base_token_adjuster should halt the process if there was an error while
/// fetching or persisting the quote. Generally that should be set to false to not to halt
/// the server process if an external api is not available or if L1 is congested.
Expand All @@ -93,6 +107,8 @@ impl Default for BaseTokenAdjusterConfig {
l1_receipt_checking_sleep_ms: Self::default_l1_receipt_checking_sleep_ms(),
l1_tx_sending_max_attempts: Self::default_l1_tx_sending_max_attempts(),
l1_tx_sending_sleep_ms: Self::default_l1_tx_sending_sleep_ms(),
price_fetching_sleep_ms: Self::default_price_fetching_sleep_ms(),
price_fetching_max_attempts: Self::default_price_fetching_max_attempts(),
halt_on_error: Self::default_halt_on_error(),
}
}
Expand Down Expand Up @@ -135,6 +151,10 @@ impl BaseTokenAdjusterConfig {
Duration::from_millis(self.l1_tx_sending_sleep_ms)
}

pub fn price_fetching_sleep_duration(&self) -> Duration {
Duration::from_millis(self.price_fetching_sleep_ms)
}

pub fn default_l1_receipt_checking_max_attempts() -> u32 {
DEFAULT_L1_RECEIPT_CHECKING_MAX_ATTEMPTS
}
Expand All @@ -151,6 +171,14 @@ impl BaseTokenAdjusterConfig {
DEFAULT_L1_TX_SENDING_SLEEP_MS
}

pub fn default_price_fetching_sleep_ms() -> u64 {
DEFAULT_PRICE_FETCHING_SLEEP_MS
}

pub fn default_price_fetching_max_attempts() -> u32 {
DEFAULT_PRICE_FETCHING_MAX_ATTEMPTS
}

pub fn default_max_tx_gas() -> u64 {
DEFAULT_MAX_TX_GAS
}
Expand Down
2 changes: 2 additions & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,8 @@ impl Distribution<configs::base_token_adjuster::BaseTokenAdjusterConfig> for Enc
l1_receipt_checking_sleep_ms: self.sample(rng),
l1_tx_sending_max_attempts: self.sample(rng),
l1_tx_sending_sleep_ms: self.sample(rng),
price_fetching_max_attempts: self.sample(rng),
price_fetching_sleep_ms: self.sample(rng),
halt_on_error: self.sample(rng),
}
}
Expand Down
8 changes: 8 additions & 0 deletions core/lib/env_config/src/base_token_adjuster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ mod tests {
l1_receipt_checking_sleep_ms: 20_000,
l1_tx_sending_max_attempts: 10,
l1_tx_sending_sleep_ms: 30_000,
price_fetching_max_attempts: 20,
price_fetching_sleep_ms: 10_000,
halt_on_error: true,
}
}
Expand All @@ -41,6 +43,8 @@ mod tests {
l1_receipt_checking_sleep_ms: 30_000,
l1_tx_sending_max_attempts: 3,
l1_tx_sending_sleep_ms: 30_000,
price_fetching_max_attempts: 3,
price_fetching_sleep_ms: 5_000,
halt_on_error: false,
}
}
Expand All @@ -58,6 +62,8 @@ mod tests {
BASE_TOKEN_ADJUSTER_L1_RECEIPT_CHECKING_SLEEP_MS=20000
BASE_TOKEN_ADJUSTER_L1_TX_SENDING_MAX_ATTEMPTS=10
BASE_TOKEN_ADJUSTER_L1_TX_SENDING_SLEEP_MS=30000
BASE_TOKEN_ADJUSTER_PRICE_FETCHING_MAX_ATTEMPTS=20
BASE_TOKEN_ADJUSTER_PRICE_FETCHING_SLEEP_MS=10000
BASE_TOKEN_ADJUSTER_HALT_ON_ERROR=true
"#;
lock.set_env(config);
Expand All @@ -79,6 +85,8 @@ mod tests {
"BASE_TOKEN_ADJUSTER_L1_RECEIPT_CHECKING_SLEEP_MS",
"BASE_TOKEN_ADJUSTER_L1_TX_SENDING_MAX_ATTEMPTS",
"BASE_TOKEN_ADJUSTER_L1_TX_SENDING_SLEEP_MS",
"BASE_TOKEN_ADJUSTER_PRICE_FETCHING_MAX_ATTEMPTS",
"BASE_TOKEN_ADJUSTER_PRICE_FETCHING_SLEEP_MS",
"BASE_TOKEN_ADJUSTER_HALT_ON_ERROR",
]);

Expand Down
8 changes: 8 additions & 0 deletions core/lib/protobuf_config/src/base_token_adjuster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ impl ProtoRepr for proto::BaseTokenAdjuster {
l1_receipt_checking_max_attempts: self
.l1_receipt_checking_max_attempts
.unwrap_or(Self::Type::default_l1_receipt_checking_max_attempts()),
price_fetching_sleep_ms: self
.price_fetching_sleep_ms
.unwrap_or(Self::Type::default_price_fetching_sleep_ms()),
price_fetching_max_attempts: self
.price_fetching_max_attempts
.unwrap_or(Self::Type::default_price_fetching_max_attempts()),
l1_tx_sending_max_attempts: self
.l1_tx_sending_max_attempts
.unwrap_or(Self::Type::default_l1_tx_sending_max_attempts()),
Expand All @@ -47,6 +53,8 @@ impl ProtoRepr for proto::BaseTokenAdjuster {
l1_receipt_checking_max_attempts: Some(this.l1_receipt_checking_max_attempts),
l1_tx_sending_max_attempts: Some(this.l1_tx_sending_max_attempts),
l1_tx_sending_sleep_ms: Some(this.l1_tx_sending_sleep_ms),
price_fetching_max_attempts: Some(this.price_fetching_max_attempts),
price_fetching_sleep_ms: Some(this.price_fetching_sleep_ms),
max_tx_gas: Some(this.max_tx_gas),
default_priority_fee_per_gas: Some(this.default_priority_fee_per_gas),
max_acceptable_priority_fee_in_gwei: Some(this.max_acceptable_priority_fee_in_gwei),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ message BaseTokenAdjuster {
optional uint32 l1_tx_sending_max_attempts = 8;
optional uint64 l1_tx_sending_sleep_ms = 9;
optional bool halt_on_error = 10;
optional uint32 price_fetching_max_attempts = 11;
optional uint64 price_fetching_sleep_ms = 12;
}
3 changes: 2 additions & 1 deletion core/node/base_token_adjuster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ zksync_external_price_api.workspace = true
zksync_contracts.workspace = true
zksync_eth_client.workspace = true
zksync_node_fee_model.workspace = true

zksync_utils.workspace = true
vise.workspace = true

tokio = { workspace = true, features = ["time"] }
anyhow.workspace = true
Expand Down
150 changes: 93 additions & 57 deletions core/node/base_token_adjuster/src/base_token_ratio_persister.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{cmp::max, fmt::Debug, sync::Arc, time::Duration};
use std::{cmp::max, fmt::Debug, sync::Arc, time::Instant};

use anyhow::Context as _;
use tokio::{sync::watch, time::sleep};
Expand All @@ -14,6 +14,8 @@ use zksync_types::{
Address, U256,
};

use crate::metrics::{OperationResult, OperationResultLabels, METRICS};

#[derive(Debug, Clone)]
pub struct BaseTokenRatioPersisterL1Params {
pub eth_client: Box<dyn BoundEthInterface>,
Expand Down Expand Up @@ -82,47 +84,7 @@ impl BaseTokenRatioPersister {
// TODO(PE-148): Consider shifting retry upon adding external API redundancy.
let new_ratio = self.retry_fetch_ratio().await?;
self.persist_ratio(new_ratio).await?;

let Some(l1_params) = &self.l1_params else {
return Ok(());
};

let max_attempts = self.config.l1_tx_sending_max_attempts;
let sleep_duration = self.config.l1_tx_sending_sleep_duration();
let mut result: anyhow::Result<()> = Ok(());
let mut prev_base_fee_per_gas: Option<u64> = None;
let mut prev_priority_fee_per_gas: Option<u64> = None;

for attempt in 0..max_attempts {
let (base_fee_per_gas, priority_fee_per_gas) =
self.get_eth_fees(l1_params, prev_base_fee_per_gas, prev_priority_fee_per_gas);

result = self
.send_ratio_to_l1(l1_params, new_ratio, base_fee_per_gas, priority_fee_per_gas)
.await;
if let Some(err) = result.as_ref().err() {
tracing::info!(
"Failed to update base token multiplier on L1, attempt {}, base_fee_per_gas {}, priority_fee_per_gas {}: {}",
attempt + 1,
base_fee_per_gas,
priority_fee_per_gas,
err
);
tokio::time::sleep(sleep_duration).await;
prev_base_fee_per_gas = Some(base_fee_per_gas);
prev_priority_fee_per_gas = Some(priority_fee_per_gas);
} else {
tracing::info!(
"Updated base token multiplier on L1: numerator {}, denominator {}, base_fee_per_gas {}, priority_fee_per_gas {}",
new_ratio.numerator.get(),
new_ratio.denominator.get(),
base_fee_per_gas,
priority_fee_per_gas
);
return result;
}
}
result
self.retry_update_ratio_on_l1(new_ratio).await
}

fn get_eth_fees(
Expand Down Expand Up @@ -157,36 +119,110 @@ impl BaseTokenRatioPersister {
(base_fee_per_gas, priority_fee_per_gas)
}

async fn retry_update_ratio_on_l1(&self, new_ratio: BaseTokenAPIRatio) -> anyhow::Result<()> {
let Some(l1_params) = &self.l1_params else {
return Ok(());
};

let max_attempts = self.config.l1_tx_sending_max_attempts;
let sleep_duration = self.config.l1_tx_sending_sleep_duration();
let mut prev_base_fee_per_gas: Option<u64> = None;
let mut prev_priority_fee_per_gas: Option<u64> = None;
let mut last_error = None;
for attempt in 0..max_attempts {
let (base_fee_per_gas, priority_fee_per_gas) =
self.get_eth_fees(l1_params, prev_base_fee_per_gas, prev_priority_fee_per_gas);

let start_time = Instant::now();
let result = self
.update_ratio_on_l1(l1_params, new_ratio, base_fee_per_gas, priority_fee_per_gas)
.await;

match result {
Ok(x) => {
tracing::info!(
"Updated base token multiplier on L1: numerator {}, denominator {}, base_fee_per_gas {}, priority_fee_per_gas {}",
new_ratio.numerator.get(),
new_ratio.denominator.get(),
base_fee_per_gas,
priority_fee_per_gas
);
METRICS
.l1_gas_used
.set(x.unwrap_or(U256::zero()).low_u128() as u64);
METRICS.l1_update_latency[&OperationResultLabels {
result: OperationResult::Success,
}]
.observe(start_time.elapsed());

return Ok(());
}
Err(err) => {
tracing::info!(
"Failed to update base token multiplier on L1, attempt {}, base_fee_per_gas {}, priority_fee_per_gas {}: {}",
attempt,
base_fee_per_gas,
priority_fee_per_gas,
err
);
METRICS.l1_update_latency[&OperationResultLabels {
result: OperationResult::Failure,
}]
.observe(start_time.elapsed());

tokio::time::sleep(sleep_duration).await;
prev_base_fee_per_gas = Some(base_fee_per_gas);
prev_priority_fee_per_gas = Some(priority_fee_per_gas);
last_error = Some(err)
}
}
}

let error_message = "Failed to update base token multiplier on L1";
Err(last_error
.map(|x| x.context(error_message))
.unwrap_or_else(|| anyhow::anyhow!(error_message)))
}

async fn retry_fetch_ratio(&self) -> anyhow::Result<BaseTokenAPIRatio> {
let sleep_duration = Duration::from_secs(1);
let max_retries = 5;
let mut attempts = 0;
let sleep_duration = self.config.price_fetching_sleep_duration();
let max_retries = self.config.price_fetching_max_attempts;
let mut last_error = None;

loop {
for attempt in 0..max_retries {
let start_time = Instant::now();
match self
.price_api_client
.fetch_ratio(self.base_token_address)
.await
{
Ok(ratio) => {
METRICS.external_price_api_latency[&OperationResultLabels {
result: OperationResult::Success,
}]
.observe(start_time.elapsed());
return Ok(ratio);
}
Err(err) if attempts < max_retries => {
attempts += 1;
Err(err) => {
tracing::warn!(
"Attempt {}/{} to fetch ratio from coingecko failed with err: {}. Retrying...",
attempts,
"Attempt {}/{} to fetch ratio from external price api failed with err: {}. Retrying...",
attempt,
max_retries,
err
);
last_error = Some(err);
METRICS.external_price_api_latency[&OperationResultLabels {
result: OperationResult::Failure,
}]
.observe(start_time.elapsed());
sleep(sleep_duration).await;
}
Err(err) => {
return Err(err)
.context("Failed to fetch base token ratio after multiple attempts");
}
}
}
let error_message = "Failed to fetch base token ratio after multiple attempts";
Err(last_error
.map(|x| x.context(error_message))
.unwrap_or_else(|| anyhow::anyhow!(error_message)))
}

async fn persist_ratio(&self, api_ratio: BaseTokenAPIRatio) -> anyhow::Result<usize> {
Expand All @@ -209,13 +245,13 @@ impl BaseTokenRatioPersister {
Ok(id)
}

async fn send_ratio_to_l1(
async fn update_ratio_on_l1(
&self,
l1_params: &BaseTokenRatioPersisterL1Params,
api_ratio: BaseTokenAPIRatio,
base_fee_per_gas: u64,
priority_fee_per_gas: u64,
) -> anyhow::Result<()> {
) -> anyhow::Result<Option<U256>> {
let fn_set_token_multiplier = l1_params
.chain_admin_contract
.function("setTokenMultiplier")
Expand Down Expand Up @@ -276,7 +312,7 @@ impl BaseTokenRatioPersister {
.context("failed getting receipt for `setTokenMultiplier` transaction")?;
if let Some(receipt) = maybe_receipt {
if receipt.status == Some(1.into()) {
return Ok(());
return Ok(receipt.gas_used);
}
return Err(anyhow::Error::msg(format!(
"`setTokenMultiplier` transaction {:?} failed with status {:?}",
Expand Down
1 change: 1 addition & 0 deletions core/node/base_token_adjuster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pub use self::{

mod base_token_ratio_persister;
mod base_token_ratio_provider;
mod metrics;
Loading

0 comments on commit 58438eb

Please sign in to comment.