From da7cb2bdd0c1038b62a7bbd3bfe36494033abff6 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Fri, 13 Sep 2024 20:43:17 +0530 Subject: [PATCH] Make generic tests working (#90) * fix generic tests * error only for prod env * exract day to use full day --- .../enriched_history/enriched_history_operations.yml | 2 +- .../enriched_history_operations_soroban.yml | 2 +- models/marts/fee_stats_agg.yml | 2 +- models/marts/history_assets.yml | 2 +- .../ledger_current_state/account_signers_current.yml | 2 +- models/marts/ledger_current_state/accounts_current.yml | 2 +- .../ledger_current_state/claimable_balances_current.yml | 2 +- .../marts/ledger_current_state/contract_data_current.yml | 2 +- .../ledger_current_state/liquidity_pools_current.yml | 2 +- models/marts/ledger_current_state/offers_current.yml | 2 +- models/marts/ledger_current_state/trust_lines_current.yml | 2 +- models/marts/ledger_current_state/ttl_current.yml | 2 +- models/marts/trade_agg.yml | 2 +- models/staging/stg_history_ledgers.sql | 1 + tests/anomaly_detection_trade_count.sql | 2 +- tests/anomaly_detection_trade_volume.sql | 4 ++-- tests/eho_by_ops.sql | 4 ++-- tests/ledger_sequence_increment.sql | 6 +++--- tests/num_txns_and_ops.sql | 8 ++++---- 19 files changed, 26 insertions(+), 25 deletions(-) diff --git a/models/marts/enriched_history/enriched_history_operations.yml b/models/marts/enriched_history/enriched_history_operations.yml index dde222a..11aca57 100644 --- a/models/marts/enriched_history/enriched_history_operations.yml +++ b/models/marts/enriched_history/enriched_history_operations.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/enriched_history/enriched_history_operations_soroban.yml b/models/marts/enriched_history/enriched_history_operations_soroban.yml index b6c55ec..5b19d29 100644 --- a/models/marts/enriched_history/enriched_history_operations_soroban.yml +++ b/models/marts/enriched_history/enriched_history_operations_soroban.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/fee_stats_agg.yml b/models/marts/fee_stats_agg.yml index 556109d..ed0ca51 100644 --- a/models/marts/fee_stats_agg.yml +++ b/models/marts/fee_stats_agg.yml @@ -9,7 +9,7 @@ models: field: cast(day_agg as timestamp) interval: 2 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/history_assets.yml b/models/marts/history_assets.yml index ff5b97b..54886ff 100644 --- a/models/marts/history_assets.yml +++ b/models/marts/history_assets.yml @@ -9,7 +9,7 @@ models: field: cast(batch_run_date as timestamp) interval: 2 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/account_signers_current.yml b/models/marts/ledger_current_state/account_signers_current.yml index 1489ec9..f27e2a4 100644 --- a/models/marts/ledger_current_state/account_signers_current.yml +++ b/models/marts/ledger_current_state/account_signers_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/accounts_current.yml b/models/marts/ledger_current_state/accounts_current.yml index ed3c0ef..43280f6 100644 --- a/models/marts/ledger_current_state/accounts_current.yml +++ b/models/marts/ledger_current_state/accounts_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/claimable_balances_current.yml b/models/marts/ledger_current_state/claimable_balances_current.yml index 4fa8a9c..212586f 100644 --- a/models/marts/ledger_current_state/claimable_balances_current.yml +++ b/models/marts/ledger_current_state/claimable_balances_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/ledger_current_state/contract_data_current.yml b/models/marts/ledger_current_state/contract_data_current.yml index e73b9e8..082205e 100644 --- a/models/marts/ledger_current_state/contract_data_current.yml +++ b/models/marts/ledger_current_state/contract_data_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/ledger_current_state/liquidity_pools_current.yml b/models/marts/ledger_current_state/liquidity_pools_current.yml index 2554aff..042dde8 100644 --- a/models/marts/ledger_current_state/liquidity_pools_current.yml +++ b/models/marts/ledger_current_state/liquidity_pools_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/offers_current.yml b/models/marts/ledger_current_state/offers_current.yml index 2299df5..186db0b 100644 --- a/models/marts/ledger_current_state/offers_current.yml +++ b/models/marts/ledger_current_state/offers_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/trust_lines_current.yml b/models/marts/ledger_current_state/trust_lines_current.yml index da604a3..88eaa64 100644 --- a/models/marts/ledger_current_state/trust_lines_current.yml +++ b/models/marts/ledger_current_state/trust_lines_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/ttl_current.yml b/models/marts/ledger_current_state/ttl_current.yml index 48d981e..a4430a5 100644 --- a/models/marts/ledger_current_state/ttl_current.yml +++ b/models/marts/ledger_current_state/ttl_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/trade_agg.yml b/models/marts/trade_agg.yml index f34e96b..8963fe1 100644 --- a/models/marts/trade_agg.yml +++ b/models/marts/trade_agg.yml @@ -9,7 +9,7 @@ models: field: cast(day_agg as timestamp) interval: 2 config: - severity: error + severity: '{{ "error" if target.name == "prod" else "warn" }}' meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/staging/stg_history_ledgers.sql b/models/staging/stg_history_ledgers.sql index 6b6315e..362c64c 100644 --- a/models/staging/stg_history_ledgers.sql +++ b/models/staging/stg_history_ledgers.sql @@ -34,6 +34,7 @@ with , batch_id , batch_run_date , batch_insert_ts + , total_byte_size_of_bucket_list , '{{ var("airflow_start_timestamp") }}' as airflow_start_ts from raw_table ) diff --git a/tests/anomaly_detection_trade_count.sql b/tests/anomaly_detection_trade_count.sql index 333e5fa..78594f4 100644 --- a/tests/anomaly_detection_trade_count.sql +++ b/tests/anomaly_detection_trade_count.sql @@ -12,7 +12,7 @@ with from {{ ref('stg_history_trades') }} where ledger_closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 90 DAY ) - and ledger_closed_at <= '{{ dbt_airflow_macros.ts(timezone=none) }}' + and ledger_closed_at <= timestamp_trunc('{{dbt_airflow_macros.ts(timezone=none) }}', day) group by close_date ) diff --git a/tests/anomaly_detection_trade_volume.sql b/tests/anomaly_detection_trade_volume.sql index 640a175..0814dde 100644 --- a/tests/anomaly_detection_trade_volume.sql +++ b/tests/anomaly_detection_trade_volume.sql @@ -12,7 +12,7 @@ with from {{ ref('stg_history_trades') }} where ledger_closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 90 DAY ) - and ledger_closed_at <= '{{ dbt_airflow_macros.ts(timezone=none) }}' + and ledger_closed_at <= timestamp_trunc('{{dbt_airflow_macros.ts(timezone=none) }}', day) group by close_date ) @@ -23,7 +23,7 @@ with from {{ ref('stg_history_trades') }} where ledger_closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 90 DAY ) - and ledger_closed_at <= '{{ dbt_airflow_macros.ts(timezone=none) }}' + and ledger_closed_at <= timestamp_trunc('{{dbt_airflow_macros.ts(timezone=none) }}', day) group by close_date ) diff --git a/tests/eho_by_ops.sql b/tests/eho_by_ops.sql index 6691f4b..250b527 100644 --- a/tests/eho_by_ops.sql +++ b/tests/eho_by_ops.sql @@ -9,12 +9,12 @@ -- any id present in the upstream table should be loaded in -- the downstream. If records are not present, alert the team. WITH find_missing AS ( - SELECT op.id, + SELECT op.op_id, op.batch_run_date, op.batch_id FROM {{ ref('stg_history_operations') }} op LEFT OUTER JOIN {{ ref('enriched_history_operations') }} eho - ON op.id = eho.op_id + ON op.op_id = eho.op_id WHERE eho.op_id IS NULL -- Scan only the last 24 hours of data. Alert runs intraday so failures -- are caught and resolved quickly. diff --git a/tests/ledger_sequence_increment.sql b/tests/ledger_sequence_increment.sql index 59c72ef..8ca36e1 100644 --- a/tests/ledger_sequence_increment.sql +++ b/tests/ledger_sequence_increment.sql @@ -7,18 +7,18 @@ with ledger_sequence as ( select - id + ledger_id , batch_id , closed_at , max(sequence) as max_sequence from {{ ref('stg_history_ledgers') }} where closed_at > TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 7 DAY ) - group by id, batch_id, closed_at + group by ledger_id, batch_id, closed_at ) , lead_sequence as ( select - id + ledger_id , batch_id , closed_at , max_sequence diff --git a/tests/num_txns_and_ops.sql b/tests/num_txns_and_ops.sql index 06d161f..1f8c823 100644 --- a/tests/num_txns_and_ops.sql +++ b/tests/num_txns_and_ops.sql @@ -11,7 +11,7 @@ -- and transactions or operations were dropped from the dataset. -- Get the actual count of transactions per ledger WITH txn_count AS ( - SELECT ledger_sequence, COUNT(id) as txn_transaction_count + SELECT ledger_sequence, COUNT(transaction_id) as txn_transaction_count FROM {{ ref('stg_history_transactions') }} --Take all ledgers committed in the last 36 hours to validate newly written data -- Alert runs at 12pm UTC in GCP which creates the 36 hour interval @@ -20,10 +20,10 @@ WITH txn_count AS ( ), -- Get the actual count of operations per ledger operation_count AS ( - SELECT A.ledger_sequence, COUNT(B.id) AS op_operation_count + SELECT A.ledger_sequence, COUNT(B.op_id) AS op_operation_count FROM {{ ref('stg_history_transactions') }} A JOIN {{ ref('stg_history_operations') }} B - ON A.id = B.transaction_id + ON A.transaction_id = B.transaction_id WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) AND TIMESTAMP(B.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) GROUP BY A.ledger_sequence @@ -32,7 +32,7 @@ WITH txn_count AS ( final_counts AS ( SELECT A.sequence, A.closed_at, A.batch_id, A.tx_set_operation_count as expected_operation_count, - A.operation_count, + A.ledger_operation_count, (A.failed_transaction_count + A.successful_transaction_count) as expected_transaction_count, COALESCE(B.txn_transaction_count, 0) as actual_transaction_count, COALESCE(C.op_operation_count, 0) as actual_operation_count