Skip to content

Commit

Permalink
Merge pull request #99 from Synthetixio/transformers/add-null-ignorin…
Browse files Browse the repository at this point in the history
…g-agg-functions

Transformers - Add FIRST/LAST functions that ignore nulls
  • Loading branch information
Tburm authored Aug 15, 2024
2 parents eb35041 + e7dc532 commit ff804e8
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 335 deletions.
27 changes: 27 additions & 0 deletions postgres/initdb/create_databases.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,34 @@ CREATE database analytics;
-- create a read only user for querying
CREATE USER analytics WITH password 'analytics';
GRANT pg_read_all_data TO analytics;

-- add the parquet extension
\c analytics
CREATE extension IF NOT EXISTS parquet_fdw;
CREATE server parquet_server foreign DATA wrapper parquet_fdw;

-- Create a function that always returns the first non-NULL value:
CREATE OR REPLACE FUNCTION first_agg (anyelement, anyelement)
RETURNS anyelement
LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE AS
'SELECT $1';

-- Then wrap an aggregate around it:
CREATE AGGREGATE first (anyelement) (
SFUNC = first_agg,
STYPE = anyelement,
PARALLEL = safe
);

-- Create a function that always returns the last non-NULL value:
CREATE OR REPLACE FUNCTION last_agg (anyelement, anyelement)
RETURNS anyelement
LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE AS
'SELECT $2';

-- Then wrap an aggregate around it:
CREATE AGGREGATE last (anyelement) (
SFUNC = last_agg,
STYPE = anyelement,
PARALLEL = safe
);
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
) }}

WITH dim AS (

SELECT
generate_series(DATE_TRUNC('hour', MIN(t.ts)), DATE_TRUNC('hour', MAX(t.ts)), '1 hour' :: INTERVAL) AS ts,
p.pool_id,
Expand Down Expand Up @@ -73,26 +72,22 @@ ffill AS (
dim.ts,
dim.pool_id,
dim.collateral_type,
debt.debt,
collateral.collateral_value,
SUM(
CASE
WHEN debt.debt IS NOT NULL THEN 1
ELSE 0
END
) over (
ORDER BY
dim.ts
) AS debt_id,
SUM(
CASE
WHEN collateral.collateral_value IS NOT NULL THEN 1
ELSE 0
END
) over (
ORDER BY
dim.ts
) AS collateral_id
coalesce(
last(debt) over (
partition by dim.collateral_type, dim.pool_id
order by dim.ts
rows between unbounded preceding and current row
),
0
) as debt,
coalesce(
last(collateral_value) over (
partition by dim.collateral_type, dim.pool_id
order by dim.ts
rows between unbounded preceding and current row
),
0
) as collateral_value
FROM
dim
LEFT JOIN debt
Expand All @@ -104,28 +99,6 @@ ffill AS (
AND dim.pool_id = collateral.pool_id
AND dim.collateral_type = collateral.collateral_type
),
hourly_index AS (
SELECT
ts,
pool_id,
collateral_type,
FIRST_VALUE(COALESCE(debt, 0)) over (
PARTITION BY debt_id,
pool_id,
collateral_type
ORDER BY
ts
) AS debt,
FIRST_VALUE(COALESCE(collateral_value, 0)) over (
PARTITION BY collateral_id,
pool_id,
collateral_type
ORDER BY
ts
) AS collateral_value
FROM
ffill
),
hourly_pnl AS (
SELECT
ts,
Expand All @@ -137,7 +110,7 @@ hourly_pnl AS (
ORDER BY
ts) - debt, 0) AS hourly_pnl
FROM
hourly_index
ffill
),
hourly_rewards AS (
SELECT
Expand Down Expand Up @@ -197,6 +170,7 @@ hourly_returns AS (
iss.collateral_type
)
)

SELECT
ts,
pool_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,11 @@ ffill AS (
SELECT
dim.ts,
dim.market_symbol,
prices.price,
SUM(
CASE
WHEN prices.price IS NOT NULL THEN 1
ELSE 0
END
) over (
PARTITION BY dim.market_symbol
ORDER BY
dim.ts
) AS price_id
last(prices.price) over (
partition by dim.market_symbol
order by dim.ts
rows between unbounded preceding and current row
) as price
FROM
dim
LEFT JOIN prices
Expand All @@ -57,12 +51,7 @@ hourly_prices AS (
SELECT
ts,
market_symbol,
FIRST_VALUE(price) over (
PARTITION BY price_id,
market_symbol
ORDER BY
ts
) AS price
price
FROM
ffill
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
) }}

WITH dim AS (

SELECT
generate_series(DATE_TRUNC('hour', MIN(t.ts)), DATE_TRUNC('hour', MAX(t.ts)), '1 hour' :: INTERVAL) AS ts,
p.pool_id,
Expand Down Expand Up @@ -73,26 +72,22 @@ ffill AS (
dim.ts,
dim.pool_id,
dim.collateral_type,
debt.debt,
collateral.collateral_value,
SUM(
CASE
WHEN debt.debt IS NOT NULL THEN 1
ELSE 0
END
) over (
ORDER BY
dim.ts
) AS debt_id,
SUM(
CASE
WHEN collateral.collateral_value IS NOT NULL THEN 1
ELSE 0
END
) over (
ORDER BY
dim.ts
) AS collateral_id
coalesce(
last(debt) over (
partition by dim.collateral_type, dim.pool_id
order by dim.ts
rows between unbounded preceding and current row
),
0
) as debt,
coalesce(
last(collateral_value) over (
partition by dim.collateral_type, dim.pool_id
order by dim.ts
rows between unbounded preceding and current row
),
0
) as collateral_value
FROM
dim
LEFT JOIN debt
Expand All @@ -104,28 +99,6 @@ ffill AS (
AND dim.pool_id = collateral.pool_id
AND dim.collateral_type = collateral.collateral_type
),
hourly_index AS (
SELECT
ts,
pool_id,
collateral_type,
FIRST_VALUE(COALESCE(debt, 0)) over (
PARTITION BY debt_id,
pool_id,
collateral_type
ORDER BY
ts
) AS debt,
FIRST_VALUE(COALESCE(collateral_value, 0)) over (
PARTITION BY collateral_id,
pool_id,
collateral_type
ORDER BY
ts
) AS collateral_value
FROM
ffill
),
hourly_pnl AS (
SELECT
ts,
Expand All @@ -137,7 +110,7 @@ hourly_pnl AS (
ORDER BY
ts) - debt, 0) AS hourly_pnl
FROM
hourly_index
ffill
),
hourly_rewards AS (
SELECT
Expand Down Expand Up @@ -197,6 +170,7 @@ hourly_returns AS (
iss.collateral_type
)
)

SELECT
ts,
pool_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,11 @@ ffill AS (
SELECT
dim.ts,
dim.market_symbol,
prices.price,
SUM(
CASE
WHEN prices.price IS NOT NULL THEN 1
ELSE 0
END
) over (
PARTITION BY dim.market_symbol
ORDER BY
dim.ts
) AS price_id
last(prices.price) over (
partition by dim.market_symbol
order by dim.ts
rows between unbounded preceding and current row
) as price
FROM
dim
LEFT JOIN prices
Expand All @@ -57,12 +51,7 @@ hourly_prices AS (
SELECT
ts,
market_symbol,
FIRST_VALUE(price) over (
PARTITION BY price_id,
market_symbol
ORDER BY
ts
) AS price
price
FROM
ffill
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
) }}

WITH dim AS (

SELECT
generate_series(DATE_TRUNC('hour', MIN(t.ts)), DATE_TRUNC('hour', MAX(t.ts)), '1 hour' :: INTERVAL) AS ts,
p.pool_id,
Expand Down Expand Up @@ -73,26 +72,22 @@ ffill AS (
dim.ts,
dim.pool_id,
dim.collateral_type,
debt.debt,
collateral.collateral_value,
SUM(
CASE
WHEN debt.debt IS NOT NULL THEN 1
ELSE 0
END
) over (
ORDER BY
dim.ts
) AS debt_id,
SUM(
CASE
WHEN collateral.collateral_value IS NOT NULL THEN 1
ELSE 0
END
) over (
ORDER BY
dim.ts
) AS collateral_id
coalesce(
last(debt) over (
partition by dim.collateral_type, dim.pool_id
order by dim.ts
rows between unbounded preceding and current row
),
0
) as debt,
coalesce(
last(collateral_value) over (
partition by dim.collateral_type, dim.pool_id
order by dim.ts
rows between unbounded preceding and current row
),
0
) as collateral_value
FROM
dim
LEFT JOIN debt
Expand All @@ -104,28 +99,6 @@ ffill AS (
AND dim.pool_id = collateral.pool_id
AND dim.collateral_type = collateral.collateral_type
),
hourly_index AS (
SELECT
ts,
pool_id,
collateral_type,
FIRST_VALUE(COALESCE(debt, 0)) over (
PARTITION BY debt_id,
pool_id,
collateral_type
ORDER BY
ts
) AS debt,
FIRST_VALUE(COALESCE(collateral_value, 0)) over (
PARTITION BY collateral_id,
pool_id,
collateral_type
ORDER BY
ts
) AS collateral_value
FROM
ffill
),
hourly_pnl AS (
SELECT
ts,
Expand All @@ -137,7 +110,7 @@ hourly_pnl AS (
ORDER BY
ts) - debt, 0) AS hourly_pnl
FROM
hourly_index
ffill
),
hourly_rewards AS (
SELECT
Expand Down Expand Up @@ -197,6 +170,7 @@ hourly_returns AS (
iss.collateral_type
)
)

SELECT
ts,
pool_id,
Expand Down
Loading

0 comments on commit ff804e8

Please sign in to comment.