Skip to content

Commit

Permalink
liq: get rid of separate rebalance job (#815)
Browse files Browse the repository at this point in the history
Previously, the separate job and the post-liquidation rebalance could
run at the same time and would occasionally perform the same action at
the same time, leading to overshooting.

Now rebalancing never happens twice. In the future it should potentially
just run separately from liquidation, but that needs a review of the
assumptions the liquidation job is making first.

(cherry picked from commit e8e7e44)
  • Loading branch information
ckamm committed Dec 13, 2023
1 parent 9ba0004 commit 47faa8a
Showing 1 changed file with 12 additions and 30 deletions.
42 changes: 12 additions & 30 deletions bin/liquidator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ async fn main() -> anyhow::Result<()> {
min_buy_fraction: 0.7,
};

let mut rebalance_interval = tokio::time::interval(Duration::from_secs(5));
let rebalance_config = rebalance::Config {
enabled: cli.rebalance == BoolArg::True,
slippage_bps: cli.rebalance_slippage_bps,
Expand Down Expand Up @@ -434,30 +433,13 @@ async fn main() -> anyhow::Result<()> {
// Could be refactored to only start the below jobs when the first snapshot is done.
// But need to take care to abort if the above job aborts beforehand.

let rebalance_job = tokio::spawn({
let shared_state = shared_state.clone();
async move {
loop {
rebalance_interval.tick().await;
if !shared_state.read().unwrap().one_snapshot_done {
continue;
}
if let Err(err) = rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);

// Workaround: We really need a sequence enforcer in the liquidator since we don't want to
// accidentally send a similar tx again when we incorrectly believe an earlier one got forked
// off. For now, hard sleep on error to avoid the most frequent error cases.
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
});

let liquidation_job = tokio::spawn({
let mut interval = tokio::time::interval(Duration::from_millis(cli.check_interval_ms));
let shared_state = shared_state.clone();
async move {
let mut must_rebalance = true;
let rebalance_delay = Duration::from_secs(5);
let mut last_rebalance = Instant::now();
loop {
interval.tick().await;

Expand All @@ -469,6 +451,14 @@ async fn main() -> anyhow::Result<()> {
state.mango_accounts.iter().cloned().collect_vec()
};

if must_rebalance || last_rebalance.elapsed() > rebalance_delay {
if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
}
must_rebalance = false;
last_rebalance = Instant::now();
}

liquidation.log_persistent_errors();

let liquidated = liquidation
Expand All @@ -484,14 +474,7 @@ async fn main() -> anyhow::Result<()> {
.unwrap();
}

if liquidated || took_tcs {
// It's awkward that this rebalance can run in parallel with the one
// from the rebalance_job. Ideally we'd get only one at a time/in quick succession.
// However, we do want to rebalance after a liquidation before liquidating further.
if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
}
}
must_rebalance = must_rebalance || liquidated || took_tcs;
}
}
});
Expand Down Expand Up @@ -549,7 +532,6 @@ async fn main() -> anyhow::Result<()> {
use futures::StreamExt;
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
data_job,
rebalance_job,
liquidation_job,
token_swap_info_job,
check_changes_for_abort_job,
Expand Down

0 comments on commit 47faa8a

Please sign in to comment.