Skip to content

Commit b584c26

Browse files
erwanorconorsch
authored andcommitted
dex: augment price indexes with position data
1 parent 6f47537 commit b584c26

File tree

13 files changed

+144
-113
lines changed

13 files changed

+144
-113
lines changed

crates/bin/pd/src/migrate.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub enum Migration {
5050
Testnet77,
5151
/// Testnet-78 migration:
5252
/// - Truncate various user-supplied `String` fields to a maximum length.
53+
/// - Populate the DEX NV price idnexes with position data
5354
Testnet78,
5455
}
5556

@@ -90,11 +91,12 @@ impl Migration {
9091
Migration::Testnet78 => {
9192
testnet78::migrate(storage, pd_home.clone(), genesis_start).await?
9293
}
93-
_ => unreachable!(),
94+
// We keep historical migrations around for now, this will help inform an abstracted
95+
// design. Feel free to remove it if it's causing you trouble.
96+
_ => unimplemented!("the specified migration is unimplemented"),
9497
}
9598

9699
if let Some(comet_home) = comet_home {
97-
// TODO avoid this when refactoring to clean up migrations
98100
let genesis_path = pd_home.join("genesis.json");
99101
migrate_comet_data(comet_home, genesis_path).await?;
100102
}

crates/bin/pd/src/migrate/testnet78.rs

+61-28
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
//! Contains functions related to the migration script of Testnet78.
2-
use anyhow::Context;
3-
use cnidarium::{Snapshot, StateDelta, Storage};
2+
use cnidarium::{Snapshot, StateDelta, StateWrite, Storage};
43
use futures::TryStreamExt as _;
4+
use futures::{pin_mut, StreamExt};
55
use jmt::RootHash;
66
use penumbra_app::app::StateReadExt as _;
7+
use penumbra_dex::component::PositionManager;
8+
use penumbra_dex::lp::position;
9+
use penumbra_dex::lp::position::Position;
710
use penumbra_governance::proposal_state::State as ProposalState;
811
use penumbra_governance::Proposal;
912
use penumbra_governance::StateReadExt as _;
10-
use penumbra_governance::StateWriteExt;
13+
use penumbra_governance::StateWriteExt as _;
1114
use penumbra_proto::core::component::governance::v1 as pb_governance;
12-
use penumbra_proto::{StateReadProto as _, StateWriteProto as _};
15+
use penumbra_proto::{StateReadProto, StateWriteProto};
1316
use penumbra_sct::component::clock::EpochManager;
14-
use penumbra_sct::component::clock::EpochRead as _;
17+
use penumbra_sct::component::clock::EpochRead;
1518
use penumbra_stake::validator::Validator;
1619
use std::path::PathBuf;
1720
use tracing::instrument;
@@ -41,32 +44,36 @@ use crate::testnet::generate::TestnetConfig;
4144
/// - `client_id` (128 bytes)
4245
/// * Governance Signaling Proposals:
4346
/// - `commit hash` (255 bytes)
47+
/// - Close and re-open all *open* positions so that they are re-indexed.
4448
#[instrument]
4549
pub async fn migrate(
4650
storage: Storage,
4751
pd_home: PathBuf,
4852
genesis_start: Option<tendermint::time::Time>,
4953
) -> anyhow::Result<()> {
50-
// Setup:
54+
/* `Migration::prepare`: collect basic migration data, logging, initialize alt-storage if needed */
5155
let initial_state = storage.latest_snapshot();
56+
5257
let chain_id = initial_state.get_chain_id().await?;
5358
let root_hash = initial_state
5459
.root_hash()
5560
.await
5661
.expect("chain state has a root hash");
57-
let pre_upgrade_root_hash: RootHash = root_hash.into();
62+
5863
let pre_upgrade_height = initial_state
5964
.get_block_height()
6065
.await
6166
.expect("chain state has a block height");
6267
let post_upgrade_height = pre_upgrade_height.wrapping_add(1);
6368

64-
// We initialize a `StateDelta` and start by reaching into the JMT for all entries matching the
65-
// swap execution prefix. Then, we write each entry to the nv-storage.
69+
let pre_upgrade_root_hash: RootHash = root_hash.into();
70+
71+
/* `Migration::migrate`: reach into the chain state and perform an offline state transition */
6672
let mut delta = StateDelta::new(initial_state);
67-
tracing::info!("beginning migration steps");
73+
6874
let (migration_duration, post_upgrade_root_hash) = {
6975
let start_time = std::time::SystemTime::now();
76+
7077
// Adjust the length of `Validator` fields.
7178
truncate_validator_fields(&mut delta).await?;
7279

@@ -76,30 +83,26 @@ pub async fn migrate(
7683
// Adjust the length of governance proposal outcome fields.
7784
truncate_proposal_outcome_fields(&mut delta).await?;
7885

86+
// Re-index all open positions.
87+
reindex_dex_positions(&mut delta).await?;
88+
89+
// Reset the application height and halt flag.
90+
delta.ready_to_start();
91+
delta.put_block_height(0u64);
92+
93+
// Finally, commit the changes to the chain state.
7994
let post_upgrade_root_hash = storage.commit_in_place(delta).await?;
8095
tracing::info!(?post_upgrade_root_hash, "post-migration root hash");
8196

8297
(
83-
start_time.elapsed().expect("start time not set"),
98+
start_time.elapsed().expect("start is set"),
8499
post_upgrade_root_hash,
85100
)
86101
};
87-
tracing::info!("completed migration steps");
88-
89-
// Set halt bit to 0, so chain can start again.
90-
let migrated_state = storage.latest_snapshot();
91-
let mut delta = StateDelta::new(migrated_state);
92-
delta.ready_to_start();
93-
delta.put_block_height(0u64);
94-
let _ = storage
95-
.commit_in_place(delta)
96-
.await
97-
.context("failed to reset halt bit")?;
98-
storage.release().await;
99102

100-
// The migration is complete, now we need to generate a genesis file. To do this, we need
101-
// to lookup a validator view from the chain, and specify the post-upgrade app hash and
102-
// initial height.
103+
tracing::info!("migration completed, generating genesis and signing state...");
104+
105+
/* `Migration::complete`: the state transition has been performed, we prepare the checkpointed genesis and signing state */
103106
let app_state = penumbra_app::genesis::Content {
104107
chain_id,
105108
..Default::default()
@@ -110,22 +113,26 @@ pub async fn migrate(
110113
.to_vec()
111114
.try_into()
112115
.expect("infaillible conversion");
116+
113117
genesis.initial_height = post_upgrade_height as i64;
114118
genesis.genesis_time = genesis_start.unwrap_or_else(|| {
115119
let now = tendermint::time::Time::now();
116120
tracing::info!(%now, "no genesis time provided, detecting a testing setup");
117121
now
118122
});
123+
124+
tracing::info!("generating checkpointed genesis");
119125
let checkpoint = post_upgrade_root_hash.0.to_vec();
120126
let genesis = TestnetConfig::make_checkpoint(genesis, Some(checkpoint));
121127

128+
tracing::info!("writing genesis to disk");
122129
let genesis_json = serde_json::to_string(&genesis).expect("can serialize genesis");
123130
tracing::info!("genesis: {}", genesis_json);
124131
let genesis_path = pd_home.join("genesis.json");
125132
std::fs::write(genesis_path, genesis_json).expect("can write genesis");
126133

134+
tracing::info!("updating signing state");
127135
let validator_state_path = pd_home.join("priv_validator_state.json");
128-
129136
let fresh_validator_state = crate::testnet::generate::TestnetValidator::initial_state();
130137
std::fs::write(validator_state_path, fresh_validator_state).expect("can write validator state");
131138

@@ -135,12 +142,38 @@ pub async fn migrate(
135142
?pre_upgrade_root_hash,
136143
?post_upgrade_root_hash,
137144
duration = migration_duration.as_secs(),
138-
"successful migration!"
145+
"migration fully complete"
139146
);
140147

141148
Ok(())
142149
}
143150

151+
async fn reindex_dex_positions(delta: &mut StateDelta<Snapshot>) -> anyhow::Result<()> {
152+
tracing::info!("running dex re-indexing migration");
153+
let prefix_key_lp = penumbra_dex::state_key::all_positions();
154+
let stream_all_lp = delta.prefix::<Position>(&prefix_key_lp);
155+
let stream_open_lp = stream_all_lp.filter_map(|entry| async {
156+
match entry {
157+
Ok((_, lp)) if lp.state == position::State::Opened => Some(lp),
158+
_ => None,
159+
}
160+
});
161+
pin_mut!(stream_open_lp);
162+
163+
while let Some(lp) = stream_open_lp.next().await {
164+
// Re-hash the position, since the key is a bech32 string.
165+
let id = lp.id();
166+
// Close the position, adjusting all its index entries.
167+
delta.close_position_by_id(&id).await?;
168+
// Erase the position from the state, so that we circumvent the `update_position` guard.
169+
delta.delete(penumbra_dex::state_key::position_by_id(&id));
170+
// Open a position with the adjusted indexing logic.
171+
delta.open_position(lp).await?;
172+
}
173+
tracing::info!("completed dex migration");
174+
Ok(())
175+
}
176+
144177
/// * Validators:
145178
/// - `name` (140 bytes)
146179
/// - `website` (70 bytes)

crates/core/component/dex/src/component/circuit_breaker/value.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ mod tests {
241241

242242
let position = buy_1;
243243
state_tx
244-
.update_position_by_price_index(&None, &position, &position.id())
244+
.update_position_by_price_index(&position.id(), &None, &position)
245245
.expect("can update price index");
246246
state_tx.put(state_key::position_by_id(&id), position);
247247

crates/core/component/dex/src/component/position_manager.rs

+20-19
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,15 @@ pub trait PositionRead: StateRead {
6262
fn positions_by_price(
6363
&self,
6464
pair: &DirectedTradingPair,
65-
) -> Pin<Box<dyn Stream<Item = Result<position::Id>> + Send + 'static>> {
65+
) -> Pin<Box<dyn Stream<Item = Result<(position::Id, position::Position)>> + Send + 'static>>
66+
{
6667
let prefix = engine::price_index::prefix(pair);
6768
tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for positions by price");
68-
self.nonverifiable_prefix_raw(&prefix)
69+
self.nonverifiable_prefix(&prefix)
6970
.map(|entry| match entry {
70-
Ok((k, _)) => {
71+
Ok((k, lp)) => {
7172
let raw_id = <&[u8; 32]>::try_from(&k[103..135])?.to_owned();
72-
Ok(position::Id(raw_id))
73+
Ok((position::Id(raw_id), lp))
7374
}
7475
Err(e) => Err(e),
7576
})
@@ -90,12 +91,9 @@ pub trait PositionRead: StateRead {
9091
async fn best_position(
9192
&self,
9293
pair: &DirectedTradingPair,
93-
) -> Result<Option<position::Position>> {
94+
) -> Result<Option<(position::Id, position::Position)>> {
9495
let mut positions_by_price = self.positions_by_price(pair);
95-
match positions_by_price.next().await.transpose()? {
96-
Some(id) => self.position_by_id(&id).await,
97-
None => Ok(None),
98-
}
96+
positions_by_price.next().await.transpose()
9997
}
10098

10199
/// Fetch the list of pending position closures.
@@ -205,7 +203,8 @@ pub trait PositionManager: StateWrite + PositionRead {
205203
new_state
206204
};
207205

208-
self.update_position(Some(prev_state), new_state).await?;
206+
self.update_position(id, Some(prev_state), new_state)
207+
.await?;
209208

210209
Ok(())
211210
}
@@ -280,7 +279,7 @@ pub trait PositionManager: StateWrite + PositionRead {
280279

281280
// Finally, record the new position state.
282281
self.record_proto(event::position_open(&position));
283-
self.update_position(None, position).await?;
282+
self.update_position(&id, None, position).await?;
284283

285284
Ok(())
286285
}
@@ -374,7 +373,8 @@ pub trait PositionManager: StateWrite + PositionRead {
374373
.map_err(|e| tracing::warn!(?e, "failed to record position execution"))
375374
.ok();
376375

377-
self.update_position(Some(prev_state), new_state).await
376+
self.update_position(&position_id, Some(prev_state), new_state)
377+
.await
378378
}
379379

380380
/// Withdraw from a closed position, incrementing its sequence number.
@@ -450,7 +450,8 @@ pub trait PositionManager: StateWrite + PositionRead {
450450
new_state
451451
};
452452

453-
self.update_position(Some(prev_state), new_state).await?;
453+
self.update_position(&position_id, Some(prev_state), new_state)
454+
.await?;
454455

455456
Ok(reserves)
456457
}
@@ -462,28 +463,28 @@ impl<T: StateWrite + ?Sized + Chandelier> PositionManager for T {}
462463
trait Inner: StateWrite {
463464
/// Writes a position to the state, updating all necessary indexes.
464465
///
465-
/// This should be the SOLE ENTRYPOINT for writing positions to the state.
466+
/// This should be the **SOLE ENTRYPOINT** for writing positions to the state.
466467
/// All other position changes exposed by the `PositionManager` should run through here.
467468
#[instrument(level = "debug", skip_all)]
468469
async fn update_position(
469470
&mut self,
471+
id: &position::Id,
470472
prev_state: Option<Position>,
471473
new_state: Position,
472474
) -> Result<Position> {
473-
let id = new_state.id();
474475
tracing::debug!(?id, prev_position_state = ?prev_state.as_ref().map(|p| &p.state), new_position_state = ?new_state.state, "updating position state");
475476
tracing::trace!(?id, ?prev_state, ?new_state, "updating position state");
476477

477478
// Assert `update_position` state transitions invariants:
478479
Self::guard_invalid_transitions(&prev_state, &new_state, &id)?;
479480

480481
// Update the DEX engine indices:
481-
self.update_position_by_price_index(&prev_state, &new_state, &id)?;
482-
self.update_position_by_inventory_index(&prev_state, &new_state, &id)?;
483-
self.update_asset_by_base_liquidity_index(&prev_state, &new_state, &id)
482+
self.update_position_by_inventory_index(&id, &prev_state, &new_state)?;
483+
self.update_asset_by_base_liquidity_index(&id, &prev_state, &new_state)
484484
.await?;
485-
self.update_trading_pair_position_counter(&prev_state, &new_state, &id)
485+
self.update_trading_pair_position_counter(&prev_state, &new_state)
486486
.await?;
487+
self.update_position_by_price_index(&id, &prev_state, &new_state)?;
487488

488489
self.put(state_key::position_by_id(&id), new_state.clone());
489490
Ok(new_state)

crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ pub(crate) trait AssetByLiquidityIndex: StateWrite {
7676
/// │ └──┘
7777
async fn update_asset_by_base_liquidity_index(
7878
&mut self,
79+
id: &position::Id,
7980
prev_state: &Option<Position>,
8081
new_state: &Position,
81-
id: &position::Id,
8282
) -> Result<()> {
8383
// We need to reconstruct the position's previous contribution and compute
8484
// its new contribution to the index. We do this for each asset in the pair

crates/core/component/dex/src/component/position_manager/counter.rs

-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ pub(crate) trait PositionCounter: StateWrite {
4141
&mut self,
4242
prev_state: &Option<Position>,
4343
new_state: &Position,
44-
_id: &position::Id,
4544
) -> Result<()> {
4645
use position::State::*;
4746
let trading_pair = new_state.phi.pair;

crates/core/component/dex/src/component/position_manager/inventory_index.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ use position::State::*;
1313
pub(super) trait PositionByInventoryIndex: StateWrite {
1414
fn update_position_by_inventory_index(
1515
&mut self,
16+
position_id: &position::Id,
1617
prev_state: &Option<Position>,
1718
new_state: &Position,
18-
position_id: &position::Id,
1919
) -> Result<()> {
2020
// Clear an existing record of the position, since changes to the
2121
// reserves or the position state might have invalidated it.

crates/core/component/dex/src/component/position_manager/price_index.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use cnidarium::StateWrite;
2+
use penumbra_proto::StateWriteProto;
23

34
use crate::{
45
lp::position::{self, Position},
@@ -12,9 +13,9 @@ use position::State::*;
1213
pub(crate) trait PositionByPriceIndex: StateWrite {
1314
fn update_position_by_price_index(
1415
&mut self,
16+
position_id: &position::Id,
1517
prev_state: &Option<Position>,
1618
new_state: &Position,
17-
position_id: &position::Id,
1819
) -> Result<()> {
1920
// Clear an existing record for the position, since changes to the
2021
// reserves or the position state might have invalidated it.
@@ -57,7 +58,10 @@ trait Inner: StateWrite {
5758
end: pair.asset_2(),
5859
};
5960
let phi12 = phi.component.clone();
60-
self.nonverifiable_put_raw(engine::price_index::key(&pair12, &phi12, &id), vec![]);
61+
self.nonverifiable_put(
62+
engine::price_index::key(&pair12, &phi12, &id),
63+
position.clone(),
64+
);
6165
tracing::debug!("indexing position for 1=>2 trades");
6266
}
6367

@@ -68,7 +72,10 @@ trait Inner: StateWrite {
6872
end: pair.asset_1(),
6973
};
7074
let phi21 = phi.component.flip();
71-
self.nonverifiable_put_raw(engine::price_index::key(&pair21, &phi21, &id), vec![]);
75+
self.nonverifiable_put(
76+
engine::price_index::key(&pair21, &phi21, &id),
77+
position.clone(),
78+
);
7279
tracing::debug!("indexing position for 2=>1 trades");
7380
}
7481
}

0 commit comments

Comments
 (0)