Skip to content

Commit

Permalink
pindexer: dex: fix remaining sql issues
Browse files Browse the repository at this point in the history
  • Loading branch information
cronokirby committed Aug 9, 2024
1 parent 63c8929 commit c94ccd5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/dex/dex.sql
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ CREATE TABLE IF NOT EXISTS dex_swap (
id SERIAL PRIMARY KEY,
height BIGINT NOT NULL,
value1 Value,
value2 Value,
value2 Value
);

CREATE INDEX ON dex_swap(height, id);
Expand Down
30 changes: 18 additions & 12 deletions crates/bin/pindexer/src/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ use crate::{AppView, ContextualizedEvent, PgTransaction};
/// This returns the start and end indices of its trace.
async fn insert_swap_execution<'d>(
dbtx: &mut Transaction<'d, Postgres>,
execution: &SwapExecution,
execution: Option<&SwapExecution>,
) -> anyhow::Result<(Option<i32>, Option<i32>)> {
let execution = match execution {
None => return Ok((None, None)),
Some(e) => e,
};
let mut trace_start = None;
let mut trace_end = None;
for trace in &execution.traces {
let mut step_start = None;
let mut step_end = None;
for step in trace {
let (id,): (i32,) = sqlx::query_as(
r#"INSERT INTO trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#,
r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#,
)
.bind(step.amount.to_string())
.bind(Sql::from(step.asset_id))
Expand All @@ -39,7 +43,7 @@ async fn insert_swap_execution<'d>(
step_end = Some(id);
}
let (id,): (i32,) =
sqlx::query_as(r#"INSERT INTO trace VALUES (DEFAULT, $1, $2) RETURNING id;"#)
sqlx::query_as(r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#)
.bind(step_start)
.bind(step_end)
.fetch_one(dbtx.as_mut())
Expand Down Expand Up @@ -75,8 +79,8 @@ enum Event {
/// A parsed version of [pb::EventBatchSwap]
BatchSwap {
height: u64,
execution12: SwapExecution,
execution21: SwapExecution,
execution12: Option<SwapExecution>,
execution21: Option<SwapExecution>,
output_data: BatchSwapOutputData,
},
/// A parsed version of [pb::EventPositionOpen]
Expand Down Expand Up @@ -171,7 +175,7 @@ impl Event {
Ok(())
}
Event::ArbExecution { height, execution } => {
let (trace_start, trace_end) = insert_swap_execution(dbtx, execution).await?;
let (trace_start, trace_end) = insert_swap_execution(dbtx, Some(execution)).await?;
sqlx::query(r#"INSERT INTO dex_arb VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5), $6, $7);"#)
.bind(i64::try_from(*height)?)
.bind(execution.input.amount.to_string())
Expand Down Expand Up @@ -360,8 +364,10 @@ impl Event {
execution21,
output_data,
} => {
let (trace12_start, trace12_end) = insert_swap_execution(dbtx, execution12).await?;
let (trace21_start, trace21_end) = insert_swap_execution(dbtx, execution21).await?;
let (trace12_start, trace12_end) =
insert_swap_execution(dbtx, execution12.as_ref()).await?;
let (trace21_start, trace21_end) =
insert_swap_execution(dbtx, execution21.as_ref()).await?;
sqlx::query(r#"INSERT INTO dex_batch_swap VALUES ($1, $2, $3, $4, $5, $6, $7, CAST($8 AS Amount), CAST($9 AS Amount), CAST($10 AS Amount), CAST($11 AS Amount), CAST($12 AS Amount), CAST($13 AS Amount));"#)
.bind(i64::try_from(*height)?)
.bind(trace12_start)
Expand Down Expand Up @@ -545,12 +551,12 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event {
.try_into()?;
let execution12 = pe
.swap_execution_1_for_2
.ok_or(anyhow!("missing swap execution"))?
.try_into()?;
.map(|x| x.try_into())
.transpose()?;
let execution21 = pe
.swap_execution_2_for_1
.ok_or(anyhow!("missing swap execution"))?
.try_into()?;
.map(|x| x.try_into())
.transpose()?;
Ok(Self::BatchSwap {
height,
execution12,
Expand Down

0 comments on commit c94ccd5

Please sign in to comment.