Skip to content

Commit

Permalink
fix: breaking solana dep update + wrapper struct
Browse files Browse the repository at this point in the history
  • Loading branch information
losman0s committed Nov 20, 2023
1 parent adf596b commit af3c87c
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 100 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[workspace]
members = ["programs/*", "clients/rust/*", "tools/*"]
exclude = ["observability/indexer"]
members = ["programs/*", "clients/rust/*", "tools/*", "observability/indexer"]

[workspace.dependencies]
solana-client = "1.14.13"
Expand All @@ -9,6 +8,9 @@ solana-logger = "1.14.13"
solana-program = "1.14.13"
solana-program-test = "1.14.13"
solana-account-decoder = "1.14.13"
solana-measure = "1.14.13"
solana-metrics = "1.14.13"
solana-transaction-status = "1.14.13"
anchor-lang = "0.26.0"
anchor-spl = "0.26.0"
anchor-client = "0.26.0"
Expand Down
5 changes: 5 additions & 0 deletions observability/indexer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
cargo-features = ["workspace-inheritance"]

[package]
name = "marginfi-v2-indexer"
version = "0.1.0"
Expand All @@ -7,6 +9,9 @@ edition = "2021"
name = "mfi-index"
path = "src/bin/main.rs"

[features]
mainnet-beta = ["marginfi/mainnet-beta"]

[dependencies]
solana-client = { workspace = true }
solana-measure = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions observability/indexer/src/commands/index_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ fn process_update(ctx: Arc<Context>, update: UpdateOneof) -> Result<()> {
UpdateOneof::Account(account_update) => {
let update_slot = account_update.slot;
if let Some(account_info) = account_update.account {
let address = Pubkey::try_from(account_info.pubkey.as_slice())?;
let address = &Pubkey::new(&account_info.pubkey);
let txn_signature = account_info
.txn_signature
.clone()
Expand All @@ -210,9 +210,9 @@ fn process_update(ctx: Arc<Context>, update: UpdateOneof) -> Result<()> {
};

slot_account_updates.insert(
address,
address.clone(),
AccountUpdateData {
address,
address: address.clone(),
timestamp: Utc::now(),
slot: update_slot,
txn_signature,
Expand Down
7 changes: 5 additions & 2 deletions observability/indexer/src/commands/snapshot_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ async fn process_update(ctx: Arc<Context>, update: UpdateOneof) -> Result<()> {
UpdateOneof::Account(account_update) => {
let update_slot = account_update.slot;
if let Some(account_info) = account_update.account {
let address = Pubkey::try_from(account_info.pubkey.clone()).unwrap();
let address = Pubkey::new(&account_info.pubkey);
let txn_signature = account_info
.txn_signature
.clone()
Expand Down Expand Up @@ -587,7 +587,10 @@ pub async fn push_transactions_to_bigquery(ctx: Arc<Context>) {
)
.await;
if let Err(error) = result {
warn!("Failed to write marginfi account metrics to bigquery: {}", error);
warn!(
"Failed to write marginfi account metrics to bigquery: {}",
error
);
}
}

Expand Down
173 changes: 173 additions & 0 deletions observability/indexer/src/utils/marginfi_account_dup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use fixed::types::I80F48;
use marginfi::{
constants::TOTAL_ASSET_VALUE_INIT_LIMIT_INACTIVE,
state::{
marginfi_account::{
calc_asset_value, Balance, BalanceSide, MarginfiAccount, RiskRequirementType,
WeightType,
},
marginfi_group::Bank,
price::{OraclePriceFeedAdapter, PriceAdapter},
},
};
use solana_sdk::pubkey::Pubkey;

pub struct BankAccountWithPriceFeed2 {
bank: Bank,
price_feed: OraclePriceFeedAdapter,
balance: Balance,
}

impl BankAccountWithPriceFeed2 {
pub fn load(
marginfi_account: &MarginfiAccount,
banks: &std::collections::HashMap<Pubkey, Bank>,
price_feeds: &std::collections::HashMap<Pubkey, OraclePriceFeedAdapter>,
) -> anyhow::Result<Vec<Self>> {
marginfi_account
.lending_account
.balances
.into_iter()
.filter(|balance| balance.active)
.enumerate()
.map(|(_, balance)| {
let bank = banks.get(&balance.bank_pk).cloned().unwrap();
let price_feed = price_feeds
.get(&bank.config.oracle_keys[0])
.cloned()
.unwrap();
Ok(BankAccountWithPriceFeed2 {
bank,
price_feed,
balance,
})
})
.collect::<anyhow::Result<Vec<_>>>()
}

#[inline(always)]
pub fn calc_weighted_assets_and_liabilities_values(
&self,
weight_type: WeightType,
) -> anyhow::Result<(I80F48, I80F48)> {
let (worst_price, best_price) = self.price_feed.get_price_range()?;
let (mut asset_weight, liability_weight) = self.bank.config.get_weights(weight_type);
let mint_decimals = self.bank.mint_decimals;

let asset_amount = self
.bank
.get_asset_amount(self.balance.asset_shares.into())?;
let liability_amount = self
.bank
.get_liability_amount(self.balance.liability_shares.into())?;

if matches!(weight_type, WeightType::Initial)
&& self.bank.config.total_asset_value_init_limit
!= TOTAL_ASSET_VALUE_INIT_LIMIT_INACTIVE
{
let bank_total_assets_value = calc_asset_value(
self.bank
.get_asset_amount(self.bank.total_asset_shares.into())?,
worst_price,
mint_decimals,
None,
)?;

let total_asset_value_init_limit =
I80F48::from_num(self.bank.config.total_asset_value_init_limit);

if bank_total_assets_value > total_asset_value_init_limit {
let discount = total_asset_value_init_limit
.checked_div(bank_total_assets_value)
.unwrap();

asset_weight = asset_weight.checked_mul(discount).unwrap();
}
}

Ok((
calc_asset_value(asset_amount, worst_price, mint_decimals, Some(asset_weight))?,
calc_asset_value(
liability_amount,
best_price,
mint_decimals,
Some(liability_weight),
)?,
))
}

#[inline]
pub fn is_empty(&self, side: BalanceSide) -> bool {
self.balance.is_empty(side)
}
}

pub struct RiskEngine2 {
bank_accounts_with_price: Vec<BankAccountWithPriceFeed2>,
}

impl RiskEngine2 {
pub fn load(
marginfi_account: &MarginfiAccount,
banks: &std::collections::HashMap<Pubkey, Bank>,
price_feeds: &std::collections::HashMap<Pubkey, OraclePriceFeedAdapter>,
) -> anyhow::Result<Self> {
Ok(Self {
bank_accounts_with_price: BankAccountWithPriceFeed2::load(
marginfi_account,
banks,
price_feeds,
)?,
})
}

/// Returns the total assets and liabilities of the account in the form of (assets, liabilities)
pub fn get_account_health_components(
&self,
requirement_type: RiskRequirementType,
) -> anyhow::Result<(I80F48, I80F48)> {
let mut total_assets = I80F48::ZERO;
let mut total_liabilities = I80F48::ZERO;

for a in &self.bank_accounts_with_price {
let (assets, liabilities) =
a.calc_weighted_assets_and_liabilities_values(requirement_type.to_weight_type())?;

total_assets = total_assets.checked_add(assets).unwrap();
total_liabilities = total_liabilities.checked_add(liabilities).unwrap();
}

Ok((total_assets, total_liabilities))
}

pub fn get_equity_components(&self) -> anyhow::Result<(I80F48, I80F48)> {
Ok(self
.bank_accounts_with_price
.iter()
.map(|a: &BankAccountWithPriceFeed2| {
a.calc_weighted_assets_and_liabilities_values(WeightType::Equity)
})
.try_fold(
(I80F48::ZERO, I80F48::ZERO),
|(total_assets, total_liabilities), res| {
let (assets, liabilities) = res?;
let total_assets_sum = total_assets.checked_add(assets).unwrap();
let total_liabilities_sum = total_liabilities.checked_add(liabilities).unwrap();

Ok::<_, anyhow::Error>((total_assets_sum, total_liabilities_sum))
},
)?)
}

pub fn get_account_health(
&self,
requirement_type: RiskRequirementType,
) -> anyhow::Result<I80F48> {
let (total_weighted_assets, total_weighted_liabilities) =
self.get_account_health_components(requirement_type)?;

Ok(total_weighted_assets
.checked_sub(total_weighted_liabilities)
.unwrap())
}
}
20 changes: 10 additions & 10 deletions observability/indexer/src/utils/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::marginfi_account_dup::RiskEngine2;
use crate::utils::big_query::DATE_FORMAT_STR;
use crate::utils::snapshot::{BankAccounts, OracleData, Snapshot};
use anyhow::anyhow;
Expand All @@ -8,7 +9,7 @@ use itertools::Itertools;
use marginfi::constants::ZERO_AMOUNT_THRESHOLD;
use marginfi::prelude::MarginfiGroup;
use marginfi::state::marginfi_account::{
calc_asset_value, MarginfiAccount, RiskEngine, RiskRequirementType, WeightType,
calc_asset_value, MarginfiAccount, RiskRequirementType, WeightType,
};
use marginfi::state::marginfi_group::BankOperationalState;
use marginfi::state::price::OraclePriceFeedAdapter;
Expand Down Expand Up @@ -470,22 +471,21 @@ impl MarginfiAccountMetrics {
}
}));

let risk_engine = RiskEngine::load_from_map(marginfi_account, &banks, &price_feeds)?;
let risk_engine = RiskEngine2::load(marginfi_account, &banks, &price_feeds)?;

let (total_assets_usd, total_liabilities_usd) =
risk_engine.get_equity_components(timestamp)?;
let (total_assets_usd, total_liabilities_usd) = risk_engine.get_equity_components()?;
let (total_assets_usd, total_liabilities_usd) = (
total_assets_usd.to_num::<f64>(),
total_liabilities_usd.to_num::<f64>(),
);
let (total_assets_usd_maintenance, total_liabilities_usd_maintenance) = risk_engine
.get_account_health_components(RiskRequirementType::Maintenance, timestamp)?;
let (total_assets_usd_maintenance, total_liabilities_usd_maintenance) =
risk_engine.get_account_health_components(RiskRequirementType::Maintenance)?;
let (total_assets_usd_maintenance, total_liabilities_usd_maintenance) = (
total_assets_usd_maintenance.to_num::<f64>(),
total_liabilities_usd_maintenance.to_num::<f64>(),
);
let (total_assets_usd_initial, total_liabilities_usd_initial) =
risk_engine.get_account_health_components(RiskRequirementType::Initial, timestamp)?;
risk_engine.get_account_health_components(RiskRequirementType::Initial)?;
let (total_assets_usd_initial, total_liabilities_usd_initial) = (
total_assets_usd_initial.to_num::<f64>(),
total_liabilities_usd_initial.to_num::<f64>(),
Expand All @@ -494,7 +494,7 @@ impl MarginfiAccountMetrics {
let positions = marginfi_account
.lending_account
.balances
.into_iter()
.iter()
.filter(|balance| balance.active)
.map(|balance| {
let bank = banks.get(&balance.bank_pk).unwrap();
Expand All @@ -514,8 +514,8 @@ impl MarginfiAccountMetrics {
.ok_or_else(|| {
anyhow!(
"Price feed {} not found for bank {}",
price_feed_pk,
balance.bank_pk
&price_feed_pk,
&balance.bank_pk
)
})
.unwrap()
Expand Down
1 change: 1 addition & 0 deletions observability/indexer/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod big_query;
pub mod errors;
pub mod geyser_client;
pub mod marginfi_account_dup;
pub mod metrics;
pub mod protos;
pub mod snapshot;
Expand Down
26 changes: 10 additions & 16 deletions observability/indexer/src/utils/protos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ mod conversion {
) -> Result<Self, Self::Error> {
Ok(Self {
data: account_data_proto.data,
owner: Pubkey::try_from(account_data_proto.owner.as_slice())
.map_err(|_| GeyserServiceError::ProtoMessageConversionFailed)?,
owner: Pubkey::new(&account_data_proto.owner),
lamports: account_data_proto.lamports,
executable: account_data_proto.executable,
rent_epoch: account_data_proto.rent_epoch,
Expand Down Expand Up @@ -81,8 +80,7 @@ mod conversion {

fn try_from(lut_proto: super::MessageAddressTableLookup) -> Result<Self, Self::Error> {
Ok(Self {
account_key: Pubkey::try_from(lut_proto.account_key.as_slice())
.map_err(|_| GeyserServiceError::ProtoMessageConversionFailed)?,
account_key: Pubkey::new(&lut_proto.account_key),
writable_indexes: lut_proto.writable_indexes,
readonly_indexes: lut_proto.readonly_indexes,
})
Expand All @@ -98,10 +96,9 @@ mod conversion {
account_keys: message_proto
.account_keys
.iter()
.map(|address_bytes| Pubkey::try_from(address_bytes.as_slice()))
.map(|address_bytes| Pubkey::new(&address_bytes))
.into_iter()
.collect::<Result<_, _>>()
.map_err(|_| GeyserServiceError::ProtoMessageConversionFailed)?,
.collect(),
header: message_header_proto.into(),
recent_blockhash: Hash::new(&message_proto.recent_blockhash),
instructions: message_proto.instructions.into_iter().map_into().collect(),
Expand All @@ -120,10 +117,9 @@ mod conversion {
account_keys: message_proto
.account_keys
.iter()
.map(|address_bytes| Pubkey::try_from(address_bytes.as_slice()))
.map(|address_bytes| Pubkey::new(address_bytes))
.into_iter()
.collect::<Result<_, _>>()
.map_err(|_| GeyserServiceError::ProtoMessageConversionFailed)?,
.collect(),
recent_blockhash: Hash::new(&message_proto.recent_blockhash),
instructions: message_proto.instructions.into_iter().map_into().collect(),
address_table_lookups: message_proto
Expand Down Expand Up @@ -226,17 +222,15 @@ mod conversion {
let loaded_writable_addresses: Vec<Pubkey> = meta_proto
.loaded_writable_addresses
.iter()
.map(|address_bytes| Pubkey::try_from(address_bytes.as_slice()))
.map(|address_bytes| Pubkey::new(&address_bytes))
.into_iter()
.collect::<Result<_, _>>()
.map_err(|_| GeyserServiceError::ProtoMessageConversionFailed)?;
.collect();
let loaded_readable_addresses = meta_proto
.loaded_readonly_addresses
.iter()
.map(|address_bytes| Pubkey::try_from(address_bytes.as_slice()))
.map(|address_bytes| Pubkey::new(&address_bytes))
.into_iter()
.collect::<Result<_, _>>()
.map_err(|_| GeyserServiceError::ProtoMessageConversionFailed)?;
.collect();

Ok(Self {
status: match meta_proto.err {
Expand Down
Loading

0 comments on commit af3c87c

Please sign in to comment.