Skip to content

Commit

Permalink
Serge/liquidator split tcs and liquidation (#914)
Browse files Browse the repository at this point in the history
liquidator: split TCS triggering and liquidation job

Concurrent execution of candidate lookup and tx building/sending
- Also added an health assertion IX to protect liqor in multi liquidation scenario
- And a timeout for jupiter v6 queries (avoid blocking liquidation because of slow TCS)
  • Loading branch information
farnyser authored Mar 20, 2024
1 parent 769f940 commit f54bb6f
Show file tree
Hide file tree
Showing 14 changed files with 1,046 additions and 393 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bin/liquidator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ tokio-stream = { version = "0.1.9"}
tokio-tungstenite = "0.16.1"
tracing = "0.1"
regex = "1.9.5"
hdrhistogram = "7.5.4"
hdrhistogram = "7.5.4"
indexmap = "2.0.0"
20 changes: 20 additions & 0 deletions bin/liquidator/src/cli_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ pub struct Cli {
#[clap(long, env, value_enum, default_value = "true")]
pub(crate) take_tcs: BoolArg,

#[clap(long, env, default_value = "30")]
pub(crate) tcs_refresh_timeout_secs: u64,

#[clap(long, env, default_value = "1000")]
pub(crate) tcs_check_interval_ms: u64,

/// profit margin at which to take tcs orders
#[clap(long, env, default_value = "0.0005")]
pub(crate) tcs_profit_fraction: f64,
Expand Down Expand Up @@ -178,6 +184,10 @@ pub struct Cli {
#[clap(long, env, default_value = "https://quote-api.jup.ag/v6")]
pub(crate) jupiter_v6_url: String,

/// override the jupiter http request timeout
#[clap(long, env, default_value = "30")]
pub(crate) jupiter_timeout_secs: u64,

/// provide a jupiter token, currently only for jup v6
#[clap(long, env, default_value = "")]
pub(crate) jupiter_token: String,
Expand All @@ -191,6 +201,12 @@ pub struct Cli {
#[clap(long, env, value_enum, default_value = "true")]
pub(crate) telemetry: BoolArg,

/// if liquidation is enabled
///
/// might be used to run an instance of liquidator dedicated to TCS and another one for liquidation
#[clap(long, env, value_enum, default_value = "true")]
pub(crate) liquidation_enabled: BoolArg,

/// liquidation refresh timeout in secs
#[clap(long, env, default_value = "30")]
pub(crate) liquidation_refresh_timeout_secs: u8,
Expand All @@ -216,4 +232,8 @@ pub struct Cli {
/// how long should it wait before logging an oracle error again (for the same token)
#[clap(long, env, default_value = "30")]
pub(crate) skip_oracle_error_in_logs_duration_secs: u64,

/// max number of liquidation/tcs to do concurrently
#[clap(long, env, default_value = "5")]
pub(crate) max_parallel_operations: u64,
}
47 changes: 43 additions & 4 deletions bin/liquidator/src/liquidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use mango_v4_client::{chain_data, MangoClient, PreparedInstructions};
use solana_sdk::signature::Signature;

use futures::{stream, StreamExt, TryStreamExt};
use mango_v4::accounts_ix::HealthCheckKind::MaintRatio;
use rand::seq::SliceRandom;
use tracing::*;
use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey};
Expand Down Expand Up @@ -260,7 +261,22 @@ impl<'a> LiquidateHelper<'a> {
)
.await
.context("creating perp_liq_base_or_positive_pnl_instruction")?;

liq_ixs.cu = liq_ixs.cu.max(self.config.compute_limit_for_liq_ix);

let liqor = &self.client.mango_account().await?;
liq_ixs.append(
self.client
.health_check_instruction(
liqor,
self.config.min_health_ratio,
vec![],
vec![*perp_market_index],
MaintRatio,
)
.await?,
);

let txsig = self
.client
.send_and_confirm_owner_tx(liq_ixs.to_instructions())
Expand Down Expand Up @@ -501,6 +517,20 @@ impl<'a> LiquidateHelper<'a> {
.await
.context("creating liq_token_with_token ix")?;
liq_ixs.cu = liq_ixs.cu.max(self.config.compute_limit_for_liq_ix);

let liqor = self.client.mango_account().await?;
liq_ixs.append(
self.client
.health_check_instruction(
&liqor,
self.config.min_health_ratio,
vec![asset_token_index, liab_token_index],
vec![],
MaintRatio,
)
.await?,
);

let txsig = self
.client
.send_and_confirm_owner_tx(liq_ixs.to_instructions())
Expand Down Expand Up @@ -651,14 +681,11 @@ impl<'a> LiquidateHelper<'a> {
}

#[allow(clippy::too_many_arguments)]
pub async fn maybe_liquidate_account(
pub async fn can_liquidate_account(
mango_client: &MangoClient,
account_fetcher: &chain_data::AccountFetcher,
pubkey: &Pubkey,
config: &Config,
) -> anyhow::Result<bool> {
let liqor_min_health_ratio = I80F48::from_num(config.min_health_ratio);

let account = account_fetcher.fetch_mango_account(pubkey)?;
let health_cache = mango_client
.health_cache(&account)
Expand All @@ -675,6 +702,18 @@ pub async fn maybe_liquidate_account(
"possible candidate",
);

Ok(true)
}

#[allow(clippy::too_many_arguments)]
pub async fn maybe_liquidate_account(
mango_client: &MangoClient,
account_fetcher: &chain_data::AccountFetcher,
pubkey: &Pubkey,
config: &Config,
) -> anyhow::Result<bool> {
let liqor_min_health_ratio = I80F48::from_num(config.min_health_ratio);

// Fetch a fresh account and re-compute
// This is -- unfortunately -- needed because the websocket streams seem to not
// be great at providing timely updates to the account data.
Expand Down
238 changes: 238 additions & 0 deletions bin/liquidator/src/liquidation_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
use crate::cli_args::Cli;
use crate::metrics::Metrics;
use crate::unwrappable_oracle_error::UnwrappableOracleError;
use crate::{liquidate, LiqErrorType, SharedState};
use anchor_lang::prelude::Pubkey;
use itertools::Itertools;
use mango_v4::state::TokenIndex;
use mango_v4_client::error_tracking::ErrorTracking;
use mango_v4_client::{chain_data, MangoClient, MangoClientError};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
use tracing::{error, trace, warn};

#[derive(Clone)]
pub struct LiquidationState {
pub mango_client: Arc<MangoClient>,
pub account_fetcher: Arc<chain_data::AccountFetcher>,
pub liquidation_config: liquidate::Config,

pub errors: Arc<RwLock<ErrorTracking<Pubkey, LiqErrorType>>>,
pub oracle_errors: Arc<RwLock<ErrorTracking<TokenIndex, LiqErrorType>>>,
}

impl LiquidationState {
async fn find_candidates(
&mut self,
accounts_iter: impl Iterator<Item = &Pubkey>,
action: impl Fn(Pubkey) -> anyhow::Result<()>,
) -> anyhow::Result<u64> {
let mut found_counter = 0u64;
use rand::seq::SliceRandom;

let mut accounts = accounts_iter.collect::<Vec<&Pubkey>>();
{
let mut rng = rand::thread_rng();
accounts.shuffle(&mut rng);
}

for pubkey in accounts {
if self.should_skip_execution(pubkey) {
continue;
}

let result =
liquidate::can_liquidate_account(&self.mango_client, &self.account_fetcher, pubkey)
.await;

self.log_or_ignore_error(&result, pubkey);

if result.unwrap_or(false) {
action(*pubkey)?;
found_counter = found_counter + 1;
}
}

Ok(found_counter)
}

fn should_skip_execution(&mut self, pubkey: &Pubkey) -> bool {
let now = Instant::now();
let error_tracking = &mut self.errors;

// Skip a pubkey if there've been too many errors recently
if let Some(error_entry) =
error_tracking
.read()
.unwrap()
.had_too_many_errors(LiqErrorType::Liq, pubkey, now)
{
trace!(
%pubkey,
error_entry.count,
"skip checking account for liquidation, had errors recently",
);
return true;
}

false
}

fn log_or_ignore_error<T>(&mut self, result: &anyhow::Result<T>, pubkey: &Pubkey) {
let error_tracking = &mut self.errors;

if let Err(err) = result.as_ref() {
if let Some((ti, ti_name)) = err.try_unwrap_oracle_error() {
if self
.oracle_errors
.read()
.unwrap()
.had_too_many_errors(LiqErrorType::Liq, &ti, Instant::now())
.is_none()
{
warn!(
"{:?} recording oracle error for token {} {}",
chrono::offset::Utc::now(),
ti_name,
ti
);
}

self.oracle_errors
.write()
.unwrap()
.record(LiqErrorType::Liq, &ti, err.to_string());
return;
}

// Keep track of pubkeys that had errors
error_tracking
.write()
.unwrap()
.record(LiqErrorType::Liq, pubkey, err.to_string());

// Not all errors need to be raised to the user's attention.
let mut is_error = true;

// Simulation errors due to liqee precondition failures on the liquidation instructions
// will commonly happen if our liquidator is late or if there are chain forks.
match err.downcast_ref::<MangoClientError>() {
Some(MangoClientError::SendTransactionPreflightFailure { logs, .. }) => {
if logs.iter().any(|line| {
line.contains("HealthMustBeNegative") || line.contains("IsNotBankrupt")
}) {
is_error = false;
}
}
_ => {}
};
if is_error {
error!("liquidating account {}: {:?}", pubkey, err);
} else {
trace!("liquidating account {}: {:?}", pubkey, err);
}
} else {
error_tracking
.write()
.unwrap()
.clear(LiqErrorType::Liq, pubkey);
}
}

pub async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result<bool> {
if self.should_skip_execution(pubkey) {
return Ok(false);
}

let result = liquidate::maybe_liquidate_account(
&self.mango_client,
&self.account_fetcher,
pubkey,
&self.liquidation_config,
)
.await;

self.log_or_ignore_error(&result, pubkey);
return result;
}
}

pub fn spawn_liquidation_job(
cli: &Cli,
shared_state: &Arc<RwLock<SharedState>>,
tx_trigger_sender: async_channel::Sender<()>,
mut liquidation: Box<LiquidationState>,
metrics: &Metrics,
) -> JoinHandle<()> {
tokio::spawn({
let mut interval =
mango_v4_client::delay_interval(Duration::from_millis(cli.check_interval_ms));
let mut metric_liquidation_check = metrics.register_latency("liquidation_check".into());
let mut metric_liquidation_start_end =
metrics.register_latency("liquidation_start_end".into());

let mut liquidation_start_time = None;

let shared_state = shared_state.clone();
async move {
loop {
interval.tick().await;

let account_addresses = {
let mut state = shared_state.write().unwrap();
if !state.one_snapshot_done {
// discard first latency info as it will skew data too much
state.oldest_chain_event_reception_time = None;
continue;
}
if state.oldest_chain_event_reception_time.is_none()
&& liquidation_start_time.is_none()
{
// no new update, skip computing
continue;
}

state.mango_accounts.iter().cloned().collect_vec()
};

liquidation.errors.write().unwrap().update();
liquidation.oracle_errors.write().unwrap().update();

if liquidation_start_time.is_none() {
liquidation_start_time = Some(Instant::now());
}

let found_candidates = liquidation
.find_candidates(account_addresses.iter(), |p| {
if shared_state
.write()
.unwrap()
.liquidation_candidates_accounts
.insert(p)
{
tx_trigger_sender.try_send(())?;
}

Ok(())
})
.await
.unwrap();

if found_candidates > 0 {
tracing::debug!("found {} candidates for liquidation", found_candidates);
}

let mut state = shared_state.write().unwrap();
let reception_time = state.oldest_chain_event_reception_time.unwrap();
let current_time = Instant::now();

state.oldest_chain_event_reception_time = None;

metric_liquidation_check.push(current_time - reception_time);
metric_liquidation_start_end.push(current_time - liquidation_start_time.unwrap());
liquidation_start_time = None;
}
}
})
}
Loading

0 comments on commit f54bb6f

Please sign in to comment.