From f3687af8276f3b73449a5c77c53372e114cc046d Mon Sep 17 00:00:00 2001 From: Atris Date: Mon, 29 Jul 2024 17:35:20 +0200 Subject: [PATCH 01/13] feat: swaps indexing --- crates/bin/pindexer/src/dex/dex.sql | 32 ++++++++++----- crates/bin/pindexer/src/dex/mod.rs | 62 ++++++++++++++++++++++++++++- 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index 8fbc2f764c..4eaea8fa38 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -14,19 +14,21 @@ DROP DOMAIN IF EXISTS Amount; CREATE DOMAIN Amount AS NUMERIC(39, 0) NOT NULL; DROP TYPE IF EXISTS Value CASCADE; -CREATE TYPE Value AS ( - amount Amount, - asset BYTEA +CREATE TYPE Value AS +( + amount Amount, + asset BYTEA ); -- Keeps track of changes to the dex's value circuit breaker. -CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change ( - -- The asset being moved into or out of the dex. - asset_id BYTEA NOT NULL, - -- The flow, either positive, or negative, into the dex via this particular asset. - -- - -- Because we're dealing with arbitrary assets, we need to use something which can store u128 - flow Amount +CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change +( + -- The asset being moved into or out of the dex. + asset_id BYTEA NOT NULL, + -- The flow, either positive, or negative, into the dex via this particular asset. + -- + -- Because we're dealing with arbitrary assets, we need to use something which can store u128 + flow Amount ); -- One step of an execution trace. @@ -113,3 +115,13 @@ CREATE TABLE IF NOT EXISTS dex_lp_execution ( -- The end asset for this execution. context_end BYTEA NOT NULL ); + +--- Represents instances where swap executions happened. +CREATE TABLE IF NOT EXISTS swap +( + height BIGINT PRIMARY KEY, + input Value, + output Value, + trace_start INTEGER REFERENCES dex_trace (id), + trace_end INTEGER REFERENCES dex_trace (id) +); \ No newline at end of file diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index 03428bd448..aa4c667bc6 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -7,6 +7,7 @@ use penumbra_dex::lp::position::{Id, Position}; use penumbra_dex::lp::{self, TradingFunction}; use penumbra_dex::{DirectedTradingPair, SwapExecution}; use penumbra_num::Amount; +use penumbra_proto::core::component::dex::v1::BatchSwapOutputData; use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb}; use sqlx::{PgPool, Postgres, Transaction}; @@ -33,6 +34,11 @@ enum Event { height: u64, execution: SwapExecution, }, + /// A parsed version of [pb::EventBatchSwap] + Swap { + height: u64, + execution: SwapExecution, + }, /// A parsed version of [pb::EventPositionOpen] PositionOpen { height: u64, position: Position }, /// A parsed version of [pb::EventPositionWithdraw] @@ -58,10 +64,11 @@ enum Event { } impl Event { - const NAMES: [&'static str; 7] = [ + const NAMES: [&'static str; 8] = [ "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit", "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit", "penumbra.core.component.dex.v1.EventArbExecution", + "penumbra.core.component.dex.v1.EventBatchSwap", "penumbra.core.component.dex.v1.EventPositionWithdraw", "penumbra.core.component.dex.v1.EventPositionOpen", "penumbra.core.component.dex.v1.EventPositionClose", @@ -328,6 +335,49 @@ impl Event { .await?; Ok(()) } + Event::Swap { height, execution } => { + let mut trace_start = None; + let mut trace_end = None; + for trace in &execution.traces { + let mut step_start = None; + let mut step_end = None; + for step in trace { + let (id,): (i32,) = sqlx::query_as( + r#"INSERT INTO trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, + ) + .bind(step.amount.to_string()) + .bind(Sql::from(step.asset_id)) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = step_start { + step_start = Some(id); + } + step_end = Some(id); + } + let (id,): (i32,) = sqlx::query_as( + r#"INSERT INTO trace VALUES (DEFAULT, $1, $2) RETURNING id;"#, + ) + .bind(step_start) + .bind(step_end) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = trace_start { + trace_start = Some(id); + } + trace_end = Some(id); + } + sqlx::query(r#"INSERT INTO swap VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS AMOUNT), $5), $6, $7);"#) + .bind(i64::try_from(*height)?) + .bind(execution.input.amount.to_string()) + .bind(Sql::from(execution.input.asset_id)) + .bind(execution.output.amount.to_string()) + .bind(Sql::from(execution.output.asset_id)) + .bind(trace_start) + .bind(trace_end) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } } } } @@ -467,6 +517,16 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event { context, }) } + // Batch Swap + x if x == Event::NAMES[3] => { + let pe = pb::EventBatchSwap::from_event(event.as_ref())?; + let height = event.block_height; + let execution = pe + .swap_execution_1_for_2 + .ok_or(anyhow!("missing swap execution"))? + .try_into()?; + Ok(Self::Swap { height, execution }) + } x => Err(anyhow!(format!("unrecognized event kind: {x}"))), } } From cd22e6b26365b8ffc28f6d397f5f31548fa6a773 Mon Sep 17 00:00:00 2001 From: Atris Date: Tue, 30 Jul 2024 18:42:58 +0200 Subject: [PATCH 02/13] fix: rebase --- crates/bin/pindexer/src/dex/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index aa4c667bc6..dc43bdde5d 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -35,7 +35,7 @@ enum Event { execution: SwapExecution, }, /// A parsed version of [pb::EventBatchSwap] - Swap { + BatchSwap { height: u64, execution: SwapExecution, }, @@ -68,11 +68,11 @@ impl Event { "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit", "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit", "penumbra.core.component.dex.v1.EventArbExecution", - "penumbra.core.component.dex.v1.EventBatchSwap", "penumbra.core.component.dex.v1.EventPositionWithdraw", "penumbra.core.component.dex.v1.EventPositionOpen", "penumbra.core.component.dex.v1.EventPositionClose", "penumbra.core.component.dex.v1.EventPositionExecution", + "penumbra.core.component.dex.v1.EventBatchSwap", ]; /// Index this event, using the handle to the postgres transaction. @@ -335,7 +335,7 @@ impl Event { .await?; Ok(()) } - Event::Swap { height, execution } => { + Event::BatchSwap { height, execution } => { let mut trace_start = None; let mut trace_end = None; for trace in &execution.traces { @@ -518,14 +518,14 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event { }) } // Batch Swap - x if x == Event::NAMES[3] => { + x if x == Event::NAMES[7] => { let pe = pb::EventBatchSwap::from_event(event.as_ref())?; let height = event.block_height; let execution = pe .swap_execution_1_for_2 .ok_or(anyhow!("missing swap execution"))? .try_into()?; - Ok(Self::Swap { height, execution }) + Ok(Self::BatchSwap { height, execution }) } x => Err(anyhow!(format!("unrecognized event kind: {x}"))), } From 3e1cbb9f6236e6c148ba7c88148f387b08b95215 Mon Sep 17 00:00:00 2001 From: Atris Date: Tue, 30 Jul 2024 18:57:06 +0200 Subject: [PATCH 03/13] fix: lint --- crates/bin/pindexer/src/dex/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index dc43bdde5d..e89c3a4f57 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -7,7 +7,6 @@ use penumbra_dex::lp::position::{Id, Position}; use penumbra_dex::lp::{self, TradingFunction}; use penumbra_dex::{DirectedTradingPair, SwapExecution}; use penumbra_num::Amount; -use penumbra_proto::core::component::dex::v1::BatchSwapOutputData; use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb}; use sqlx::{PgPool, Postgres, Transaction}; From 1700b49c8d9861c61baf75e16314437fbab78bc7 Mon Sep 17 00:00:00 2001 From: Atris Date: Tue, 30 Jul 2024 20:16:44 +0200 Subject: [PATCH 04/13] fix: add more data for swaps --- crates/bin/pindexer/src/dex/dex.sql | 10 +++++++++- crates/bin/pindexer/src/dex/mod.rs | 31 +++++++++++++++++++++++++---- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index 4eaea8fa38..9413adffdb 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -123,5 +123,13 @@ CREATE TABLE IF NOT EXISTS swap input Value, output Value, trace_start INTEGER REFERENCES dex_trace (id), - trace_end INTEGER REFERENCES dex_trace (id) + trace_end INTEGER REFERENCES dex_trace (id), + pair_asset_id_1 BYTEA NOT NULL, + pair_asset_id_2 BYTEA NOT NULL, + unfilled_1 Amount NOT NULL, + unfilled_2 Amount NOT NULL, + delta_1 Amount NOT NULL, + delta_2 Amount NOT NULL, + lambda_1 Amount NOT NULL, + lambda_2 Amount NOT NULL ); \ No newline at end of file diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index e89c3a4f57..29d9ba7541 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -5,7 +5,7 @@ use cometindex::async_trait; use penumbra_asset::asset::Id as AssetId; use penumbra_dex::lp::position::{Id, Position}; use penumbra_dex::lp::{self, TradingFunction}; -use penumbra_dex::{DirectedTradingPair, SwapExecution}; +use penumbra_dex::{BatchSwapOutputData, DirectedTradingPair, SwapExecution}; use penumbra_num::Amount; use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb}; use sqlx::{PgPool, Postgres, Transaction}; @@ -37,6 +37,7 @@ enum Event { BatchSwap { height: u64, execution: SwapExecution, + output_data: BatchSwapOutputData, }, /// A parsed version of [pb::EventPositionOpen] PositionOpen { height: u64, position: Position }, @@ -334,7 +335,11 @@ impl Event { .await?; Ok(()) } - Event::BatchSwap { height, execution } => { + Event::BatchSwap { + height, + execution, + output_data, + } => { let mut trace_start = None; let mut trace_end = None; for trace in &execution.traces { @@ -365,7 +370,9 @@ impl Event { } trace_end = Some(id); } - sqlx::query(r#"INSERT INTO swap VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS AMOUNT), $5), $6, $7);"#) + sqlx::query(r#"INSERT INTO swap VALUES ($1, (CAST($2 AS Amount), $3), + (CAST($4 AS AMOUNT), $5), $6, $7, $8, $9, CAST($10 AS AMOUNT), CAST($11 AS AMOUNT), + CAST($12 AS AMOUNT), CAST($13 AS AMOUNT), CAST($14 AS AMOUNT), CAST($15 AS AMOUNT),;"#) .bind(i64::try_from(*height)?) .bind(execution.input.amount.to_string()) .bind(Sql::from(execution.input.asset_id)) @@ -373,6 +380,14 @@ impl Event { .bind(Sql::from(execution.output.asset_id)) .bind(trace_start) .bind(trace_end) + .bind(Sql::from(output_data.trading_pair.asset_1())) + .bind(Sql::from(output_data.trading_pair.asset_2())) + .bind(output_data.unfilled_1.to_string()) + .bind(output_data.unfilled_2.to_string()) + .bind(output_data.delta_1.to_string()) + .bind(output_data.delta_2.to_string()) + .bind(output_data.lambda_1.to_string()) + .bind(output_data.lambda_2.to_string()) .execute(dbtx.as_mut()) .await?; Ok(()) @@ -520,11 +535,19 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event { x if x == Event::NAMES[7] => { let pe = pb::EventBatchSwap::from_event(event.as_ref())?; let height = event.block_height; + let output_data = pe + .batch_swap_output_data + .ok_or(anyhow!("missing swap execution"))? + .try_into()?; let execution = pe .swap_execution_1_for_2 .ok_or(anyhow!("missing swap execution"))? .try_into()?; - Ok(Self::BatchSwap { height, execution }) + Ok(Self::BatchSwap { + height, + execution, + output_data: batch_swap_output_data, + }) } x => Err(anyhow!(format!("unrecognized event kind: {x}"))), } From 1f9696cc6a8587e6a54be3d20660071425f9fd04 Mon Sep 17 00:00:00 2001 From: Atris Date: Tue, 30 Jul 2024 20:25:25 +0200 Subject: [PATCH 05/13] fix: add more data for swaps --- crates/bin/pindexer/src/dex/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index 29d9ba7541..4eaed47d8f 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -546,7 +546,7 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event { Ok(Self::BatchSwap { height, execution, - output_data: batch_swap_output_data, + output_data, }) } x => Err(anyhow!(format!("unrecognized event kind: {x}"))), From 8e8b3d61191563e9bebf3aa2e0465b278acb3fa8 Mon Sep 17 00:00:00 2001 From: Atris Date: Tue, 30 Jul 2024 21:09:51 +0200 Subject: [PATCH 06/13] fix: rename swap to dex_swap --- crates/bin/pindexer/src/dex/dex.sql | 2 +- crates/bin/pindexer/src/dex/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index 9413adffdb..0596934685 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -117,7 +117,7 @@ CREATE TABLE IF NOT EXISTS dex_lp_execution ( ); --- Represents instances where swap executions happened. -CREATE TABLE IF NOT EXISTS swap +CREATE TABLE IF NOT EXISTS dex_swap ( height BIGINT PRIMARY KEY, input Value, diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index 4eaed47d8f..725f9c25f3 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -370,7 +370,7 @@ impl Event { } trace_end = Some(id); } - sqlx::query(r#"INSERT INTO swap VALUES ($1, (CAST($2 AS Amount), $3), + sqlx::query(r#"INSERT INTO dex_swap VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS AMOUNT), $5), $6, $7, $8, $9, CAST($10 AS AMOUNT), CAST($11 AS AMOUNT), CAST($12 AS AMOUNT), CAST($13 AS AMOUNT), CAST($14 AS AMOUNT), CAST($15 AS AMOUNT),;"#) .bind(i64::try_from(*height)?) From 980a4095c80f797ddfce22a80f7b48f11c1c9c0a Mon Sep 17 00:00:00 2001 From: Atris Date: Wed, 31 Jul 2024 16:08:37 +0200 Subject: [PATCH 07/13] fix: remove redundant fields --- crates/bin/pindexer/src/dex/dex.sql | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index 0596934685..d7ec3c835e 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -120,8 +120,6 @@ CREATE TABLE IF NOT EXISTS dex_lp_execution ( CREATE TABLE IF NOT EXISTS dex_swap ( height BIGINT PRIMARY KEY, - input Value, - output Value, trace_start INTEGER REFERENCES dex_trace (id), trace_end INTEGER REFERENCES dex_trace (id), pair_asset_id_1 BYTEA NOT NULL, From b539cee9bd34181ec34691f9b0307e00fa665aaf Mon Sep 17 00:00:00 2001 From: Lucas Meier Date: Fri, 9 Aug 2024 11:06:52 -0700 Subject: [PATCH 08/13] pindexer: dex: Format SQL --- crates/bin/pindexer/src/dex/dex.sql | 49 ++++++++++++++--------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index d7ec3c835e..9d83227a52 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -14,21 +14,19 @@ DROP DOMAIN IF EXISTS Amount; CREATE DOMAIN Amount AS NUMERIC(39, 0) NOT NULL; DROP TYPE IF EXISTS Value CASCADE; -CREATE TYPE Value AS -( - amount Amount, - asset BYTEA +CREATE TYPE Value AS ( + amount Amount, + asset BYTEA ); -- Keeps track of changes to the dex's value circuit breaker. -CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change -( - -- The asset being moved into or out of the dex. - asset_id BYTEA NOT NULL, - -- The flow, either positive, or negative, into the dex via this particular asset. - -- - -- Because we're dealing with arbitrary assets, we need to use something which can store u128 - flow Amount +CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change ( + -- The asset being moved into or out of the dex. + asset_id BYTEA NOT NULL, + -- The flow, either positive, or negative, into the dex via this particular asset. + -- + -- Because we're dealing with arbitrary assets, we need to use something which can store u128 + flow Amount ); -- One step of an execution trace. @@ -117,17 +115,16 @@ CREATE TABLE IF NOT EXISTS dex_lp_execution ( ); --- Represents instances where swap executions happened. -CREATE TABLE IF NOT EXISTS dex_swap -( - height BIGINT PRIMARY KEY, - trace_start INTEGER REFERENCES dex_trace (id), - trace_end INTEGER REFERENCES dex_trace (id), - pair_asset_id_1 BYTEA NOT NULL, - pair_asset_id_2 BYTEA NOT NULL, - unfilled_1 Amount NOT NULL, - unfilled_2 Amount NOT NULL, - delta_1 Amount NOT NULL, - delta_2 Amount NOT NULL, - lambda_1 Amount NOT NULL, - lambda_2 Amount NOT NULL -); \ No newline at end of file +CREATE TABLE IF NOT EXISTS dex_swap ( + height BIGINT PRIMARY KEY, + trace_start INTEGER REFERENCES dex_trace (id), + trace_end INTEGER REFERENCES dex_trace (id), + pair_asset_id_1 BYTEA NOT NULL, + pair_asset_id_2 BYTEA NOT NULL, + unfilled_1 Amount NOT NULL, + unfilled_2 Amount NOT NULL, + delta_1 Amount NOT NULL, + delta_2 Amount NOT NULL, + lambda_1 Amount NOT NULL, + lambda_2 Amount NOT NULL +); From 6c8eb273ad682894970f86470c8880d54c31323c Mon Sep 17 00:00:00 2001 From: Lucas Meier Date: Fri, 9 Aug 2024 12:09:30 -0700 Subject: [PATCH 09/13] pindexer: dex: rename dex_swap to dex_batch_swap --- crates/bin/pindexer/src/dex/dex.sql | 2 +- crates/bin/pindexer/src/dex/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index 9d83227a52..47efd15f12 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -115,7 +115,7 @@ CREATE TABLE IF NOT EXISTS dex_lp_execution ( ); --- Represents instances where swap executions happened. -CREATE TABLE IF NOT EXISTS dex_swap ( +CREATE TABLE IF NOT EXISTS dex_batch_swap ( height BIGINT PRIMARY KEY, trace_start INTEGER REFERENCES dex_trace (id), trace_end INTEGER REFERENCES dex_trace (id), diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index 725f9c25f3..342d5310b1 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -370,7 +370,7 @@ impl Event { } trace_end = Some(id); } - sqlx::query(r#"INSERT INTO dex_swap VALUES ($1, (CAST($2 AS Amount), $3), + sqlx::query(r#"INSERT INTO dex_batch_swap VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS AMOUNT), $5), $6, $7, $8, $9, CAST($10 AS AMOUNT), CAST($11 AS AMOUNT), CAST($12 AS AMOUNT), CAST($13 AS AMOUNT), CAST($14 AS AMOUNT), CAST($15 AS AMOUNT),;"#) .bind(i64::try_from(*height)?) From 98028c64edcdaf01af0dfdfe4dca7b31197fa274 Mon Sep 17 00:00:00 2001 From: Lucas Meier Date: Fri, 9 Aug 2024 15:12:58 -0700 Subject: [PATCH 10/13] pindexer: dex: fill out batch swap information Adds the missing trace for the other direction in the swap. --- crates/bin/pindexer/src/dex/dex.sql | 22 ++--- crates/bin/pindexer/src/dex/mod.rs | 131 ++++++++++++---------------- 2 files changed, 70 insertions(+), 83 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index 47efd15f12..93c522b4e8 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -117,14 +117,16 @@ CREATE TABLE IF NOT EXISTS dex_lp_execution ( --- Represents instances where swap executions happened. CREATE TABLE IF NOT EXISTS dex_batch_swap ( height BIGINT PRIMARY KEY, - trace_start INTEGER REFERENCES dex_trace (id), - trace_end INTEGER REFERENCES dex_trace (id), - pair_asset_id_1 BYTEA NOT NULL, - pair_asset_id_2 BYTEA NOT NULL, - unfilled_1 Amount NOT NULL, - unfilled_2 Amount NOT NULL, - delta_1 Amount NOT NULL, - delta_2 Amount NOT NULL, - lambda_1 Amount NOT NULL, - lambda_2 Amount NOT NULL + trace12_start INTEGER REFERENCES dex_trace (id), + trace12_end INTEGER REFERENCES dex_trace (id), + trace21_start INTEGER REFERENCES dex_trace (id), + trace21_end INTEGER REFERENCES dex_trace (id), + pair_asset1 BYTEA NOT NULL, + pair_asset2 BYTEA NOT NULL, + unfilled1 Amount NOT NULL, + unfilled2 Amount NOT NULL, + delta1 Amount NOT NULL, + delta2 Amount NOT NULL, + lambda1 Amount NOT NULL, + lambda2 Amount NOT NULL ); diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index 342d5310b1..0f2f42a37f 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -13,6 +13,45 @@ use sqlx::{PgPool, Postgres, Transaction}; use crate::sql::Sql; use crate::{AppView, ContextualizedEvent, PgTransaction}; +/// Insert a swap execution into the database. +/// +/// This returns the start and end indices of its trace. +async fn insert_swap_execution<'d>( + dbtx: &mut Transaction<'d, Postgres>, + execution: &SwapExecution, +) -> anyhow::Result<(Option, Option)> { + let mut trace_start = None; + let mut trace_end = None; + for trace in &execution.traces { + let mut step_start = None; + let mut step_end = None; + for step in trace { + let (id,): (i32,) = sqlx::query_as( + r#"INSERT INTO trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, + ) + .bind(step.amount.to_string()) + .bind(Sql::from(step.asset_id)) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = step_start { + step_start = Some(id); + } + step_end = Some(id); + } + let (id,): (i32,) = + sqlx::query_as(r#"INSERT INTO trace VALUES (DEFAULT, $1, $2) RETURNING id;"#) + .bind(step_start) + .bind(step_end) + .fetch_one(dbtx.as_mut()) + .await?; + if let None = trace_start { + trace_start = Some(id); + } + trace_end = Some(id); + } + Ok((trace_start, trace_end)) +} + /// One of the possible events that we care about. #[derive(Clone, Debug)] enum Event { @@ -36,7 +75,8 @@ enum Event { /// A parsed version of [pb::EventBatchSwap] BatchSwap { height: u64, - execution: SwapExecution, + execution12: SwapExecution, + execution21: SwapExecution, output_data: BatchSwapOutputData, }, /// A parsed version of [pb::EventPositionOpen] @@ -123,36 +163,7 @@ impl Event { Ok(()) } Event::ArbExecution { height, execution } => { - let mut trace_start = None; - let mut trace_end = None; - for trace in &execution.traces { - let mut step_start = None; - let mut step_end = None; - for step in trace { - let (id,): (i32,) = sqlx::query_as( - r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, - ) - .bind(step.amount.to_string()) - .bind(Sql::from(step.asset_id)) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = step_start { - step_start = Some(id); - } - step_end = Some(id); - } - let (id,): (i32,) = sqlx::query_as( - r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#, - ) - .bind(step_start) - .bind(step_end) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = trace_start { - trace_start = Some(id); - } - trace_end = Some(id); - } + let (trace_start, trace_end) = insert_swap_execution(dbtx, execution).await?; sqlx::query(r#"INSERT INTO dex_arb VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5), $6, $7);"#) .bind(i64::try_from(*height)?) .bind(execution.input.amount.to_string()) @@ -337,49 +348,18 @@ impl Event { } Event::BatchSwap { height, - execution, + execution12, + execution21, output_data, } => { - let mut trace_start = None; - let mut trace_end = None; - for trace in &execution.traces { - let mut step_start = None; - let mut step_end = None; - for step in trace { - let (id,): (i32,) = sqlx::query_as( - r#"INSERT INTO trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, - ) - .bind(step.amount.to_string()) - .bind(Sql::from(step.asset_id)) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = step_start { - step_start = Some(id); - } - step_end = Some(id); - } - let (id,): (i32,) = sqlx::query_as( - r#"INSERT INTO trace VALUES (DEFAULT, $1, $2) RETURNING id;"#, - ) - .bind(step_start) - .bind(step_end) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = trace_start { - trace_start = Some(id); - } - trace_end = Some(id); - } - sqlx::query(r#"INSERT INTO dex_batch_swap VALUES ($1, (CAST($2 AS Amount), $3), - (CAST($4 AS AMOUNT), $5), $6, $7, $8, $9, CAST($10 AS AMOUNT), CAST($11 AS AMOUNT), - CAST($12 AS AMOUNT), CAST($13 AS AMOUNT), CAST($14 AS AMOUNT), CAST($15 AS AMOUNT),;"#) + let (trace12_start, trace12_end) = insert_swap_execution(dbtx, execution12).await?; + let (trace21_start, trace21_end) = insert_swap_execution(dbtx, execution21).await?; + sqlx::query(r#"INSERT INTO dex_batch_swap VALUES ($1, $2, $3, $4, $5, $6, $7, CAST($8 AS Amount), CAST($9 AS Amount), CAST($10 AS Amount), CAST($11 AS Amount), CAST($12 AS Amount), CAST($13 AS Amount));"#) .bind(i64::try_from(*height)?) - .bind(execution.input.amount.to_string()) - .bind(Sql::from(execution.input.asset_id)) - .bind(execution.output.amount.to_string()) - .bind(Sql::from(execution.output.asset_id)) - .bind(trace_start) - .bind(trace_end) + .bind(trace12_start) + .bind(trace12_end) + .bind(trace21_start) + .bind(trace21_end) .bind(Sql::from(output_data.trading_pair.asset_1())) .bind(Sql::from(output_data.trading_pair.asset_2())) .bind(output_data.unfilled_1.to_string()) @@ -539,13 +519,18 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event { .batch_swap_output_data .ok_or(anyhow!("missing swap execution"))? .try_into()?; - let execution = pe + let execution12 = pe .swap_execution_1_for_2 .ok_or(anyhow!("missing swap execution"))? .try_into()?; + let execution21 = pe + .swap_execution_2_for_1 + .ok_or(anyhow!("missing swap execution"))? + .try_into()?; Ok(Self::BatchSwap { height, - execution, + execution12, + execution21, output_data, }) } From 77ba1eee605704c7b6a18a82ab2ae2a942bd3564 Mon Sep 17 00:00:00 2001 From: Lucas Meier Date: Fri, 9 Aug 2024 16:46:43 -0700 Subject: [PATCH 11/13] pindexer: dex: add indexing for swap events --- crates/bin/pindexer/src/dex/dex.sql | 12 +++++++ crates/bin/pindexer/src/dex/mod.rs | 51 +++++++++++++++++++++++++++-- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index 93c522b4e8..744c456a26 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -130,3 +130,15 @@ CREATE TABLE IF NOT EXISTS dex_batch_swap ( lambda1 Amount NOT NULL, lambda2 Amount NOT NULL ); + +-- Represents instances of invididual swaps into the batch. +CREATE TABLE IF NOT EXISTS dex_swap ( + id SERIAL PRIMARY KEY, + height BIGINT NOT NULL, + value1 Value, + value2 Value, +); + +CREATE INDEX ON dex_swap(height, id); +CREATE INDEX ON dex_swap(((value1).asset)); +CREATE INDEX ON dex_swap(((value2).asset)); diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index 0f2f42a37f..f473aad5f3 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -5,7 +5,7 @@ use cometindex::async_trait; use penumbra_asset::asset::Id as AssetId; use penumbra_dex::lp::position::{Id, Position}; use penumbra_dex::lp::{self, TradingFunction}; -use penumbra_dex::{BatchSwapOutputData, DirectedTradingPair, SwapExecution}; +use penumbra_dex::{BatchSwapOutputData, DirectedTradingPair, SwapExecution, TradingPair}; use penumbra_num::Amount; use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb}; use sqlx::{PgPool, Postgres, Transaction}; @@ -101,10 +101,17 @@ enum Event { prev_reserves_2: Amount, context: DirectedTradingPair, }, + /// A parsed version of [pb::EventSwap] + Swap { + height: u64, + trading_pair: TradingPair, + delta_1_i: Amount, + delta_2_i: Amount, + }, } impl Event { - const NAMES: [&'static str; 8] = [ + const NAMES: [&'static str; 9] = [ "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit", "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit", "penumbra.core.component.dex.v1.EventArbExecution", @@ -113,6 +120,7 @@ impl Event { "penumbra.core.component.dex.v1.EventPositionClose", "penumbra.core.component.dex.v1.EventPositionExecution", "penumbra.core.component.dex.v1.EventBatchSwap", + "penumbra.core.component.dex.v1.EventSwap", ]; /// Index this event, using the handle to the postgres transaction. @@ -372,6 +380,22 @@ impl Event { .await?; Ok(()) } + Event::Swap { + height, + trading_pair, + delta_1_i, + delta_2_i, + } => { + sqlx::query(r#"INSERT INTO dex_swap VALUES (DEFAULT, $1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5));"#) + .bind(i64::try_from(*height)?) + .bind(delta_1_i.to_string()) + .bind(Sql::from(trading_pair.asset_1())) + .bind(delta_2_i.to_string()) + .bind(Sql::from(trading_pair.asset_2())) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } } } } @@ -534,6 +558,29 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event { output_data, }) } + // Swap + x if x == Event::NAMES[8] => { + let pe = pb::EventSwap::from_event(event.as_ref())?; + let height = event.block_height; + let trading_pair = pe + .trading_pair + .expect("trading_pair should be present") + .try_into()?; + let delta_1_i = pe + .delta_1_i + .expect("delta_1_i should be present") + .try_into()?; + let delta_2_i = pe + .delta_2_i + .expect("delta_2_i should be present") + .try_into()?; + Ok(Self::Swap { + height, + trading_pair, + delta_1_i, + delta_2_i, + }) + } x => Err(anyhow!(format!("unrecognized event kind: {x}"))), } } From 63c892976eb313695971a2e851a13f51d195e053 Mon Sep 17 00:00:00 2001 From: Lucas Meier Date: Fri, 9 Aug 2024 16:49:42 -0700 Subject: [PATCH 12/13] pindexer: dex: create indices for dex batch swaps --- crates/bin/pindexer/src/dex/dex.sql | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index 744c456a26..f3339a3737 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -121,8 +121,8 @@ CREATE TABLE IF NOT EXISTS dex_batch_swap ( trace12_end INTEGER REFERENCES dex_trace (id), trace21_start INTEGER REFERENCES dex_trace (id), trace21_end INTEGER REFERENCES dex_trace (id), - pair_asset1 BYTEA NOT NULL, - pair_asset2 BYTEA NOT NULL, + asset1 BYTEA NOT NULL, + asset2 BYTEA NOT NULL, unfilled1 Amount NOT NULL, unfilled2 Amount NOT NULL, delta1 Amount NOT NULL, @@ -131,6 +131,10 @@ CREATE TABLE IF NOT EXISTS dex_batch_swap ( lambda2 Amount NOT NULL ); +CREATE INDEX ON dex_batch_swap(height); +CREATE INDEX ON dex_batch_swap(asset1, height); +CREATE INDEX ON dex_batch_swap(asset2, height); + -- Represents instances of invididual swaps into the batch. CREATE TABLE IF NOT EXISTS dex_swap ( id SERIAL PRIMARY KEY, From c94ccd5b45a22c256d788c9939c0b149077e0063 Mon Sep 17 00:00:00 2001 From: Lucas Meier Date: Fri, 9 Aug 2024 16:58:26 -0700 Subject: [PATCH 13/13] pindexer: dex: fix remaining sql issues --- crates/bin/pindexer/src/dex/dex.sql | 2 +- crates/bin/pindexer/src/dex/mod.rs | 30 +++++++++++++++++------------ 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql index f3339a3737..720dbce968 100644 --- a/crates/bin/pindexer/src/dex/dex.sql +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -140,7 +140,7 @@ CREATE TABLE IF NOT EXISTS dex_swap ( id SERIAL PRIMARY KEY, height BIGINT NOT NULL, value1 Value, - value2 Value, + value2 Value ); CREATE INDEX ON dex_swap(height, id); diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs index f473aad5f3..f084a25d08 100644 --- a/crates/bin/pindexer/src/dex/mod.rs +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -18,8 +18,12 @@ use crate::{AppView, ContextualizedEvent, PgTransaction}; /// This returns the start and end indices of its trace. async fn insert_swap_execution<'d>( dbtx: &mut Transaction<'d, Postgres>, - execution: &SwapExecution, + execution: Option<&SwapExecution>, ) -> anyhow::Result<(Option, Option)> { + let execution = match execution { + None => return Ok((None, None)), + Some(e) => e, + }; let mut trace_start = None; let mut trace_end = None; for trace in &execution.traces { @@ -27,7 +31,7 @@ async fn insert_swap_execution<'d>( let mut step_end = None; for step in trace { let (id,): (i32,) = sqlx::query_as( - r#"INSERT INTO trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, + r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, ) .bind(step.amount.to_string()) .bind(Sql::from(step.asset_id)) @@ -39,7 +43,7 @@ async fn insert_swap_execution<'d>( step_end = Some(id); } let (id,): (i32,) = - sqlx::query_as(r#"INSERT INTO trace VALUES (DEFAULT, $1, $2) RETURNING id;"#) + sqlx::query_as(r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#) .bind(step_start) .bind(step_end) .fetch_one(dbtx.as_mut()) @@ -75,8 +79,8 @@ enum Event { /// A parsed version of [pb::EventBatchSwap] BatchSwap { height: u64, - execution12: SwapExecution, - execution21: SwapExecution, + execution12: Option, + execution21: Option, output_data: BatchSwapOutputData, }, /// A parsed version of [pb::EventPositionOpen] @@ -171,7 +175,7 @@ impl Event { Ok(()) } Event::ArbExecution { height, execution } => { - let (trace_start, trace_end) = insert_swap_execution(dbtx, execution).await?; + let (trace_start, trace_end) = insert_swap_execution(dbtx, Some(execution)).await?; sqlx::query(r#"INSERT INTO dex_arb VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5), $6, $7);"#) .bind(i64::try_from(*height)?) .bind(execution.input.amount.to_string()) @@ -360,8 +364,10 @@ impl Event { execution21, output_data, } => { - let (trace12_start, trace12_end) = insert_swap_execution(dbtx, execution12).await?; - let (trace21_start, trace21_end) = insert_swap_execution(dbtx, execution21).await?; + let (trace12_start, trace12_end) = + insert_swap_execution(dbtx, execution12.as_ref()).await?; + let (trace21_start, trace21_end) = + insert_swap_execution(dbtx, execution21.as_ref()).await?; sqlx::query(r#"INSERT INTO dex_batch_swap VALUES ($1, $2, $3, $4, $5, $6, $7, CAST($8 AS Amount), CAST($9 AS Amount), CAST($10 AS Amount), CAST($11 AS Amount), CAST($12 AS Amount), CAST($13 AS Amount));"#) .bind(i64::try_from(*height)?) .bind(trace12_start) @@ -545,12 +551,12 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event { .try_into()?; let execution12 = pe .swap_execution_1_for_2 - .ok_or(anyhow!("missing swap execution"))? - .try_into()?; + .map(|x| x.try_into()) + .transpose()?; let execution21 = pe .swap_execution_2_for_1 - .ok_or(anyhow!("missing swap execution"))? - .try_into()?; + .map(|x| x.try_into()) + .transpose()?; Ok(Self::BatchSwap { height, execution12,