Skip to content

Commit

Permalink
Make generic tests working (#90)
Browse files Browse the repository at this point in the history
* fix generic tests

* error only for prod env

* exract day to use full day
  • Loading branch information
amishas157 authored Sep 13, 2024
1 parent 0425e26 commit da7cb2b
Show file tree
Hide file tree
Showing 19 changed files with 26 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: closed_at
interval: 12
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: closed_at
interval: 12
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
columns:
Expand Down
2 changes: 1 addition & 1 deletion models/marts/fee_stats_agg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: cast(day_agg as timestamp)
interval: 2
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
columns:
Expand Down
2 changes: 1 addition & 1 deletion models/marts/history_assets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: cast(batch_run_date as timestamp)
interval: 2
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
- incremental_unique_combination_of_columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: closed_at
interval: 12
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
- incremental_unique_combination_of_columns:
Expand Down
2 changes: 1 addition & 1 deletion models/marts/ledger_current_state/accounts_current.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: closed_at
interval: 12
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
- incremental_unique_combination_of_columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: closed_at
interval: 12
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: closed_at
interval: 12
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: closed_at
interval: 12
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
- incremental_unique_combination_of_columns:
Expand Down
2 changes: 1 addition & 1 deletion models/marts/ledger_current_state/offers_current.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: closed_at
interval: 12
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
- incremental_unique_combination_of_columns:
Expand Down
2 changes: 1 addition & 1 deletion models/marts/ledger_current_state/trust_lines_current.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: closed_at
interval: 12
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
- incremental_unique_combination_of_columns:
Expand Down
2 changes: 1 addition & 1 deletion models/marts/ledger_current_state/ttl_current.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: closed_at
interval: 12
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
columns:
Expand Down
2 changes: 1 addition & 1 deletion models/marts/trade_agg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ models:
field: cast(day_agg as timestamp)
interval: 2
config:
severity: error
severity: '{{ "error" if target.name == "prod" else "warn" }}'
meta:
description: "Monitors the freshness of your table over time, as the expected time between data updates."
- incremental_unique_combination_of_columns:
Expand Down
1 change: 1 addition & 0 deletions models/staging/stg_history_ledgers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ with
, batch_id
, batch_run_date
, batch_insert_ts
, total_byte_size_of_bucket_list
, '{{ var("airflow_start_timestamp") }}' as airflow_start_ts
from raw_table
)
Expand Down
2 changes: 1 addition & 1 deletion tests/anomaly_detection_trade_count.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ with
from {{ ref('stg_history_trades') }}
where
ledger_closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 90 DAY )
and ledger_closed_at <= '{{ dbt_airflow_macros.ts(timezone=none) }}'
and ledger_closed_at <= timestamp_trunc('{{dbt_airflow_macros.ts(timezone=none) }}', day)
group by close_date
)

Expand Down
4 changes: 2 additions & 2 deletions tests/anomaly_detection_trade_volume.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ with
from {{ ref('stg_history_trades') }}
where
ledger_closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 90 DAY )
and ledger_closed_at <= '{{ dbt_airflow_macros.ts(timezone=none) }}'
and ledger_closed_at <= timestamp_trunc('{{dbt_airflow_macros.ts(timezone=none) }}', day)
group by close_date
)

Expand All @@ -23,7 +23,7 @@ with
from {{ ref('stg_history_trades') }}
where
ledger_closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 90 DAY )
and ledger_closed_at <= '{{ dbt_airflow_macros.ts(timezone=none) }}'
and ledger_closed_at <= timestamp_trunc('{{dbt_airflow_macros.ts(timezone=none) }}', day)
group by close_date
)

Expand Down
4 changes: 2 additions & 2 deletions tests/eho_by_ops.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
-- any id present in the upstream table should be loaded in
-- the downstream. If records are not present, alert the team.
WITH find_missing AS (
SELECT op.id,
SELECT op.op_id,
op.batch_run_date,
op.batch_id
FROM {{ ref('stg_history_operations') }} op
LEFT OUTER JOIN {{ ref('enriched_history_operations') }} eho
ON op.id = eho.op_id
ON op.op_id = eho.op_id
WHERE eho.op_id IS NULL
-- Scan only the last 24 hours of data. Alert runs intraday so failures
-- are caught and resolved quickly.
Expand Down
6 changes: 3 additions & 3 deletions tests/ledger_sequence_increment.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@
with
ledger_sequence as (
select
id
ledger_id
, batch_id
, closed_at
, max(sequence) as max_sequence
from {{ ref('stg_history_ledgers') }}
where closed_at > TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 7 DAY )
group by id, batch_id, closed_at
group by ledger_id, batch_id, closed_at
)

, lead_sequence as (
select
id
ledger_id
, batch_id
, closed_at
, max_sequence
Expand Down
8 changes: 4 additions & 4 deletions tests/num_txns_and_ops.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
-- and transactions or operations were dropped from the dataset.
-- Get the actual count of transactions per ledger
WITH txn_count AS (
SELECT ledger_sequence, COUNT(id) as txn_transaction_count
SELECT ledger_sequence, COUNT(transaction_id) as txn_transaction_count
FROM {{ ref('stg_history_transactions') }}
--Take all ledgers committed in the last 36 hours to validate newly written data
-- Alert runs at 12pm UTC in GCP which creates the 36 hour interval
Expand All @@ -20,10 +20,10 @@ WITH txn_count AS (
),
-- Get the actual count of operations per ledger
operation_count AS (
SELECT A.ledger_sequence, COUNT(B.id) AS op_operation_count
SELECT A.ledger_sequence, COUNT(B.op_id) AS op_operation_count
FROM {{ ref('stg_history_transactions') }} A
JOIN {{ ref('stg_history_operations') }} B
ON A.id = B.transaction_id
ON A.transaction_id = B.transaction_id
WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY )
AND TIMESTAMP(B.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY )
GROUP BY A.ledger_sequence
Expand All @@ -32,7 +32,7 @@ WITH txn_count AS (
final_counts AS (
SELECT A.sequence, A.closed_at, A.batch_id,
A.tx_set_operation_count as expected_operation_count,
A.operation_count,
A.ledger_operation_count,
(A.failed_transaction_count + A.successful_transaction_count) as expected_transaction_count,
COALESCE(B.txn_transaction_count, 0) as actual_transaction_count,
COALESCE(C.op_operation_count, 0) as actual_operation_count
Expand Down

0 comments on commit da7cb2b

Please sign in to comment.