Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dex: fold Positions into the price indexes #4564

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/bin/pd/src/migrate.rs
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ pub enum Migration {
Testnet77,
/// Testnet-78 migration:
/// - Truncate various user-supplied `String` fields to a maximum length.
/// - Populate the DEX NV price idnexes with position data
Testnet78,
}

@@ -90,11 +91,12 @@ impl Migration {
Migration::Testnet78 => {
testnet78::migrate(storage, pd_home.clone(), genesis_start).await?
}
_ => unreachable!(),
// We keep historical migrations around for now, this will help inform an abstracted
// design. Feel free to remove it if it's causing you trouble.
_ => unimplemented!("the specified migration is unimplemented"),
}

if let Some(comet_home) = comet_home {
// TODO avoid this when refactoring to clean up migrations
let genesis_path = pd_home.join("genesis.json");
migrate_comet_data(comet_home, genesis_path).await?;
}
89 changes: 61 additions & 28 deletions crates/bin/pd/src/migrate/testnet78.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
//! Contains functions related to the migration script of Testnet78.
use anyhow::Context;
use cnidarium::{Snapshot, StateDelta, Storage};
use cnidarium::{Snapshot, StateDelta, StateWrite, Storage};
use futures::TryStreamExt as _;
use futures::{pin_mut, StreamExt};
use jmt::RootHash;
use penumbra_app::app::StateReadExt as _;
use penumbra_dex::component::PositionManager;
use penumbra_dex::lp::position;
use penumbra_dex::lp::position::Position;
use penumbra_governance::proposal_state::State as ProposalState;
use penumbra_governance::Proposal;
use penumbra_governance::StateReadExt as _;
use penumbra_governance::StateWriteExt;
use penumbra_governance::StateWriteExt as _;
use penumbra_proto::core::component::governance::v1 as pb_governance;
use penumbra_proto::{StateReadProto as _, StateWriteProto as _};
use penumbra_proto::{StateReadProto, StateWriteProto};
use penumbra_sct::component::clock::EpochManager;
use penumbra_sct::component::clock::EpochRead as _;
use penumbra_sct::component::clock::EpochRead;
use penumbra_stake::validator::Validator;
use std::path::PathBuf;
use tracing::instrument;
@@ -41,32 +44,36 @@ use crate::testnet::generate::TestnetConfig;
/// - `client_id` (128 bytes)
/// * Governance Signaling Proposals:
/// - `commit hash` (255 bytes)
/// - Close and re-open all *open* positions so that they are re-indexed.
#[instrument]
pub async fn migrate(
storage: Storage,
pd_home: PathBuf,
genesis_start: Option<tendermint::time::Time>,
) -> anyhow::Result<()> {
// Setup:
/* `Migration::prepare`: collect basic migration data, logging, initialize alt-storage if needed */
let initial_state = storage.latest_snapshot();

let chain_id = initial_state.get_chain_id().await?;
let root_hash = initial_state
.root_hash()
.await
.expect("chain state has a root hash");
let pre_upgrade_root_hash: RootHash = root_hash.into();

let pre_upgrade_height = initial_state
.get_block_height()
.await
.expect("chain state has a block height");
let post_upgrade_height = pre_upgrade_height.wrapping_add(1);

// We initialize a `StateDelta` and start by reaching into the JMT for all entries matching the
// swap execution prefix. Then, we write each entry to the nv-storage.
let pre_upgrade_root_hash: RootHash = root_hash.into();

/* `Migration::migrate`: reach into the chain state and perform an offline state transition */
let mut delta = StateDelta::new(initial_state);
tracing::info!("beginning migration steps");

let (migration_duration, post_upgrade_root_hash) = {
let start_time = std::time::SystemTime::now();

// Adjust the length of `Validator` fields.
truncate_validator_fields(&mut delta).await?;

@@ -76,30 +83,26 @@ pub async fn migrate(
// Adjust the length of governance proposal outcome fields.
truncate_proposal_outcome_fields(&mut delta).await?;

// Re-index all open positions.
reindex_dex_positions(&mut delta).await?;

// Reset the application height and halt flag.
delta.ready_to_start();
delta.put_block_height(0u64);

// Finally, commit the changes to the chain state.
let post_upgrade_root_hash = storage.commit_in_place(delta).await?;
tracing::info!(?post_upgrade_root_hash, "post-migration root hash");

(
start_time.elapsed().expect("start time not set"),
start_time.elapsed().expect("start is set"),
post_upgrade_root_hash,
)
};
tracing::info!("completed migration steps");

// Set halt bit to 0, so chain can start again.
let migrated_state = storage.latest_snapshot();
let mut delta = StateDelta::new(migrated_state);
delta.ready_to_start();
delta.put_block_height(0u64);
let _ = storage
.commit_in_place(delta)
.await
.context("failed to reset halt bit")?;
storage.release().await;

// The migration is complete, now we need to generate a genesis file. To do this, we need
// to lookup a validator view from the chain, and specify the post-upgrade app hash and
// initial height.
tracing::info!("migration completed, generating genesis and signing state...");

/* `Migration::complete`: the state transition has been performed, we prepare the checkpointed genesis and signing state */
let app_state = penumbra_app::genesis::Content {
chain_id,
..Default::default()
@@ -110,22 +113,26 @@ pub async fn migrate(
.to_vec()
.try_into()
.expect("infaillible conversion");

genesis.initial_height = post_upgrade_height as i64;
genesis.genesis_time = genesis_start.unwrap_or_else(|| {
let now = tendermint::time::Time::now();
tracing::info!(%now, "no genesis time provided, detecting a testing setup");
now
});

tracing::info!("generating checkpointed genesis");
let checkpoint = post_upgrade_root_hash.0.to_vec();
let genesis = TestnetConfig::make_checkpoint(genesis, Some(checkpoint));

tracing::info!("writing genesis to disk");
let genesis_json = serde_json::to_string(&genesis).expect("can serialize genesis");
tracing::info!("genesis: {}", genesis_json);
let genesis_path = pd_home.join("genesis.json");
std::fs::write(genesis_path, genesis_json).expect("can write genesis");

tracing::info!("updating signing state");
let validator_state_path = pd_home.join("priv_validator_state.json");

let fresh_validator_state = crate::testnet::generate::TestnetValidator::initial_state();
std::fs::write(validator_state_path, fresh_validator_state).expect("can write validator state");

@@ -135,12 +142,38 @@ pub async fn migrate(
?pre_upgrade_root_hash,
?post_upgrade_root_hash,
duration = migration_duration.as_secs(),
"successful migration!"
"migration fully complete"
);

Ok(())
}

async fn reindex_dex_positions(delta: &mut StateDelta<Snapshot>) -> anyhow::Result<()> {
tracing::info!("running dex re-indexing migration");
let prefix_key_lp = penumbra_dex::state_key::all_positions();
let stream_all_lp = delta.prefix::<Position>(&prefix_key_lp);
let stream_open_lp = stream_all_lp.filter_map(|entry| async {
match entry {
Ok((_, lp)) if lp.state == position::State::Opened => Some(lp),
_ => None,
}
});
pin_mut!(stream_open_lp);

while let Some(lp) = stream_open_lp.next().await {
// Re-hash the position, since the key is a bech32 string.
let id = lp.id();
// Close the position, adjusting all its index entries.
delta.close_position_by_id(&id).await?;
// Erase the position from the state, so that we circumvent the `update_position` guard.
delta.delete(penumbra_dex::state_key::position_by_id(&id));
// Open a position with the adjusted indexing logic.
delta.open_position(lp).await?;
}
tracing::info!("completed dex migration");
Ok(())
}

/// * Validators:
/// - `name` (140 bytes)
/// - `website` (70 bytes)
Original file line number Diff line number Diff line change
@@ -241,7 +241,7 @@ mod tests {

let position = buy_1;
state_tx
.update_position_by_price_index(&None, &position, &position.id())
.update_position_by_price_index(&position.id(), &None, &position)
.expect("can update price index");
state_tx.put(state_key::position_by_id(&id), position);

39 changes: 20 additions & 19 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
@@ -62,14 +62,15 @@ pub trait PositionRead: StateRead {
fn positions_by_price(
&self,
pair: &DirectedTradingPair,
) -> Pin<Box<dyn Stream<Item = Result<position::Id>> + Send + 'static>> {
) -> Pin<Box<dyn Stream<Item = Result<(position::Id, position::Position)>> + Send + 'static>>
{
let prefix = engine::price_index::prefix(pair);
tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for positions by price");
self.nonverifiable_prefix_raw(&prefix)
self.nonverifiable_prefix(&prefix)
.map(|entry| match entry {
Ok((k, _)) => {
Ok((k, lp)) => {
let raw_id = <&[u8; 32]>::try_from(&k[103..135])?.to_owned();
Ok(position::Id(raw_id))
Ok((position::Id(raw_id), lp))
}
Err(e) => Err(e),
})
@@ -90,12 +91,9 @@ pub trait PositionRead: StateRead {
async fn best_position(
&self,
pair: &DirectedTradingPair,
) -> Result<Option<position::Position>> {
) -> Result<Option<(position::Id, position::Position)>> {
let mut positions_by_price = self.positions_by_price(pair);
match positions_by_price.next().await.transpose()? {
Some(id) => self.position_by_id(&id).await,
None => Ok(None),
}
positions_by_price.next().await.transpose()
}

/// Fetch the list of pending position closures.
@@ -205,7 +203,8 @@ pub trait PositionManager: StateWrite + PositionRead {
new_state
};

self.update_position(Some(prev_state), new_state).await?;
self.update_position(id, Some(prev_state), new_state)
.await?;

Ok(())
}
@@ -280,7 +279,7 @@ pub trait PositionManager: StateWrite + PositionRead {

// Finally, record the new position state.
self.record_proto(event::position_open(&position));
self.update_position(None, position).await?;
self.update_position(&id, None, position).await?;

Ok(())
}
@@ -374,7 +373,8 @@ pub trait PositionManager: StateWrite + PositionRead {
.map_err(|e| tracing::warn!(?e, "failed to record position execution"))
.ok();

self.update_position(Some(prev_state), new_state).await
self.update_position(&position_id, Some(prev_state), new_state)
.await
}

/// Withdraw from a closed position, incrementing its sequence number.
@@ -450,7 +450,8 @@ pub trait PositionManager: StateWrite + PositionRead {
new_state
};

self.update_position(Some(prev_state), new_state).await?;
self.update_position(&position_id, Some(prev_state), new_state)
.await?;

Ok(reserves)
}
@@ -462,28 +463,28 @@ impl<T: StateWrite + ?Sized + Chandelier> PositionManager for T {}
trait Inner: StateWrite {
/// Writes a position to the state, updating all necessary indexes.
///
/// This should be the SOLE ENTRYPOINT for writing positions to the state.
/// This should be the **SOLE ENTRYPOINT** for writing positions to the state.
/// All other position changes exposed by the `PositionManager` should run through here.
#[instrument(level = "debug", skip_all)]
async fn update_position(
&mut self,
id: &position::Id,
prev_state: Option<Position>,
new_state: Position,
) -> Result<Position> {
let id = new_state.id();
tracing::debug!(?id, prev_position_state = ?prev_state.as_ref().map(|p| &p.state), new_position_state = ?new_state.state, "updating position state");
tracing::trace!(?id, ?prev_state, ?new_state, "updating position state");

// Assert `update_position` state transitions invariants:
Self::guard_invalid_transitions(&prev_state, &new_state, &id)?;

// Update the DEX engine indices:
self.update_position_by_price_index(&prev_state, &new_state, &id)?;
self.update_position_by_inventory_index(&prev_state, &new_state, &id)?;
self.update_asset_by_base_liquidity_index(&prev_state, &new_state, &id)
self.update_position_by_inventory_index(&id, &prev_state, &new_state)?;
self.update_asset_by_base_liquidity_index(&id, &prev_state, &new_state)
.await?;
self.update_trading_pair_position_counter(&prev_state, &new_state, &id)
self.update_trading_pair_position_counter(&prev_state, &new_state)
.await?;
self.update_position_by_price_index(&id, &prev_state, &new_state)?;

self.put(state_key::position_by_id(&id), new_state.clone());
Ok(new_state)
Original file line number Diff line number Diff line change
@@ -76,9 +76,9 @@ pub(crate) trait AssetByLiquidityIndex: StateWrite {
/// │ └──┘
async fn update_asset_by_base_liquidity_index(
&mut self,
id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
id: &position::Id,
) -> Result<()> {
// We need to reconstruct the position's previous contribution and compute
// its new contribution to the index. We do this for each asset in the pair
Original file line number Diff line number Diff line change
@@ -41,7 +41,6 @@ pub(crate) trait PositionCounter: StateWrite {
&mut self,
prev_state: &Option<Position>,
new_state: &Position,
_id: &position::Id,
) -> Result<()> {
use position::State::*;
let trading_pair = new_state.phi.pair;
Original file line number Diff line number Diff line change
@@ -13,9 +13,9 @@ use position::State::*;
pub(super) trait PositionByInventoryIndex: StateWrite {
fn update_position_by_inventory_index(
&mut self,
position_id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
position_id: &position::Id,
) -> Result<()> {
// Clear an existing record of the position, since changes to the
// reserves or the position state might have invalidated it.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use cnidarium::StateWrite;
use penumbra_proto::StateWriteProto;

use crate::{
lp::position::{self, Position},
@@ -12,9 +13,9 @@ use position::State::*;
pub(crate) trait PositionByPriceIndex: StateWrite {
fn update_position_by_price_index(
&mut self,
position_id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
position_id: &position::Id,
) -> Result<()> {
// Clear an existing record for the position, since changes to the
// reserves or the position state might have invalidated it.
@@ -57,7 +58,10 @@ trait Inner: StateWrite {
end: pair.asset_2(),
};
let phi12 = phi.component.clone();
self.nonverifiable_put_raw(engine::price_index::key(&pair12, &phi12, &id), vec![]);
self.nonverifiable_put(
engine::price_index::key(&pair12, &phi12, &id),
position.clone(),
);
tracing::debug!("indexing position for 1=>2 trades");
}

@@ -68,7 +72,10 @@ trait Inner: StateWrite {
end: pair.asset_1(),
};
let phi21 = phi.component.flip();
self.nonverifiable_put_raw(engine::price_index::key(&pair21, &phi21, &id), vec![]);
self.nonverifiable_put(
engine::price_index::key(&pair21, &phi21, &id),
position.clone(),
);
tracing::debug!("indexing position for 2=>1 trades");
}
}
Loading