From 88d0fed05cb214969d136fa6c078e2b661a5a64f Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Tue, 3 Sep 2024 16:22:39 +0530 Subject: [PATCH 1/9] Move tests from scheduled queries --- tests/bucketlist_db_size_check.sql | 27 +++++++++++++++++++++++++++ tests/sorobon_surge_pricing_check.sql | 27 +++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 tests/bucketlist_db_size_check.sql create mode 100644 tests/sorobon_surge_pricing_check.sql diff --git a/tests/bucketlist_db_size_check.sql b/tests/bucketlist_db_size_check.sql new file mode 100644 index 0000000..4e04b34 --- /dev/null +++ b/tests/bucketlist_db_size_check.sql @@ -0,0 +1,27 @@ +{{ config( + severity="error" + , tags=["singular_test"] + ) +}} + +with bucketlist_db_size as ( + select sequence, + closed_at, + total_byte_size_of_bucket_list / 1000000000 as bl_db_gb + from `crypto-stellar.crypto_stellar.history_ledgers` + where closed_at >= current_timestamp() - interval 1 hour + -- alert when the bucketlist has grown larger than 12 gb + and total_byte_size_of_bucket_list / 1000000000 >= 12 +) + +select +if ( + (select count(*) + from bucketlist_db_size + ) = 0, + --Return when True: + 'Bucketlist is under 12GB', + -- Return Alert when False: + ERROR('BucketlistDB size has exceeded 12GB!')) + ; + select true; diff --git a/tests/sorobon_surge_pricing_check.sql b/tests/sorobon_surge_pricing_check.sql new file mode 100644 index 0000000..2b83a98 --- /dev/null +++ b/tests/sorobon_surge_pricing_check.sql @@ -0,0 +1,27 @@ +{{ config( + severity="warn" + , tags=["singular_test"] + ) +}} + +with surge_pricing_check as ( + select inclusion_fee_charged, + ledger_sequence, + closed_at +from `crypto-stellar.crypto_stellar_dbt.enriched_history_operations_soroban` +where closed_at >= current_timestamp - interval 1 hour + -- inclusion fees over 100 stroops indicate surge pricing on the network + and inclusion_fee_charged > 100 +) + +select +if ( + (select count(*) + from surge_pricing_check + ) = 0, + --Return when True: + 'Network is in normal pricing', + -- Return Alert when False: + ERROR('Network has entered surge pricing for Soroban')) + ; + select true; From a05afc4934efada5dd53ab78a0b2a2bdf7ee2c0e Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Tue, 3 Sep 2024 21:50:34 +0530 Subject: [PATCH 2/9] Move tests from scheduled queries and set freshness check to error instead of warning --- .../enriched_history/enriched_history_operations.yml | 2 +- .../enriched_history_operations_soroban.yml | 2 +- models/marts/fee_stats_agg.yml | 2 +- models/marts/history_assets.yml | 10 +++++++++- .../ledger_current_state/account_signers_current.yml | 2 +- models/marts/ledger_current_state/accounts_current.yml | 2 +- .../claimable_balances_current.yml | 2 +- .../ledger_current_state/contract_data_current.yml | 2 +- .../ledger_current_state/liquidity_pools_current.yml | 2 +- models/marts/ledger_current_state/offers_current.yml | 2 +- .../marts/ledger_current_state/trust_lines_current.yml | 2 +- models/marts/ledger_current_state/ttl_current.yml | 2 +- models/marts/trade_agg.yml | 2 +- models/sources/src_accounts.yml | 1 - 14 files changed, 21 insertions(+), 14 deletions(-) diff --git a/models/marts/enriched_history/enriched_history_operations.yml b/models/marts/enriched_history/enriched_history_operations.yml index f807d85..29738bf 100644 --- a/models/marts/enriched_history/enriched_history_operations.yml +++ b/models/marts/enriched_history/enriched_history_operations.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/enriched_history/enriched_history_operations_soroban.yml b/models/marts/enriched_history/enriched_history_operations_soroban.yml index f5f66e9..9a8d9f9 100644 --- a/models/marts/enriched_history/enriched_history_operations_soroban.yml +++ b/models/marts/enriched_history/enriched_history_operations_soroban.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/fee_stats_agg.yml b/models/marts/fee_stats_agg.yml index 7b8e200..556109d 100644 --- a/models/marts/fee_stats_agg.yml +++ b/models/marts/fee_stats_agg.yml @@ -9,7 +9,7 @@ models: field: cast(day_agg as timestamp) interval: 2 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/history_assets.yml b/models/marts/history_assets.yml index 4c0a7a6..73522ac 100644 --- a/models/marts/history_assets.yml +++ b/models/marts/history_assets.yml @@ -9,9 +9,17 @@ models: field: cast(batch_run_date as timestamp) interval: 2 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." + - incremental_unique_combination_of_columns: + combination_of_columns: + - batch_run_date + - asset_type + - asset_code + - asset_issuer + date_column_name: "batch_run_date" + greater_than_equal_to: "2 day" columns: - name: asset_id description: '{{ doc("assets_id") }}' diff --git a/models/marts/ledger_current_state/account_signers_current.yml b/models/marts/ledger_current_state/account_signers_current.yml index cd19957..5108298 100644 --- a/models/marts/ledger_current_state/account_signers_current.yml +++ b/models/marts/ledger_current_state/account_signers_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/accounts_current.yml b/models/marts/ledger_current_state/accounts_current.yml index 80f66c7..de962f3 100644 --- a/models/marts/ledger_current_state/accounts_current.yml +++ b/models/marts/ledger_current_state/accounts_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/claimable_balances_current.yml b/models/marts/ledger_current_state/claimable_balances_current.yml index ca9d406..7afeab1 100644 --- a/models/marts/ledger_current_state/claimable_balances_current.yml +++ b/models/marts/ledger_current_state/claimable_balances_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/ledger_current_state/contract_data_current.yml b/models/marts/ledger_current_state/contract_data_current.yml index 3443f8c..952f62f 100644 --- a/models/marts/ledger_current_state/contract_data_current.yml +++ b/models/marts/ledger_current_state/contract_data_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/ledger_current_state/liquidity_pools_current.yml b/models/marts/ledger_current_state/liquidity_pools_current.yml index 39c6567..b256305 100644 --- a/models/marts/ledger_current_state/liquidity_pools_current.yml +++ b/models/marts/ledger_current_state/liquidity_pools_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/offers_current.yml b/models/marts/ledger_current_state/offers_current.yml index b5dd807..3f0573c 100644 --- a/models/marts/ledger_current_state/offers_current.yml +++ b/models/marts/ledger_current_state/offers_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/trust_lines_current.yml b/models/marts/ledger_current_state/trust_lines_current.yml index e780478..fadd38c 100644 --- a/models/marts/ledger_current_state/trust_lines_current.yml +++ b/models/marts/ledger_current_state/trust_lines_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/ttl_current.yml b/models/marts/ledger_current_state/ttl_current.yml index 32013e5..e131d84 100644 --- a/models/marts/ledger_current_state/ttl_current.yml +++ b/models/marts/ledger_current_state/ttl_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/trade_agg.yml b/models/marts/trade_agg.yml index 2847a89..f34e96b 100644 --- a/models/marts/trade_agg.yml +++ b/models/marts/trade_agg.yml @@ -9,7 +9,7 @@ models: field: cast(day_agg as timestamp) interval: 2 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/sources/src_accounts.yml b/models/sources/src_accounts.yml index 3d89ba0..324ca5c 100644 --- a/models/sources/src_accounts.yml +++ b/models/sources/src_accounts.yml @@ -14,7 +14,6 @@ sources: - incremental_unique_combination_of_columns: combination_of_columns: - account_id - - sequence_number - ledger_entry_change - last_modified_ledger date_column_name: "batch_run_date" From 8dc3b54bc8d6409b9a1fa7dd9ac0d77961d2795d Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Sep 2024 13:14:43 +0530 Subject: [PATCH 3/9] Update generic tests --- tests/bucketlist_db_size_check.sql | 14 ++------------ tests/sorobon_surge_pricing_check.sql | 14 ++------------ 2 files changed, 4 insertions(+), 24 deletions(-) diff --git a/tests/bucketlist_db_size_check.sql b/tests/bucketlist_db_size_check.sql index 4e04b34..4aa25f1 100644 --- a/tests/bucketlist_db_size_check.sql +++ b/tests/bucketlist_db_size_check.sql @@ -9,19 +9,9 @@ with bucketlist_db_size as ( closed_at, total_byte_size_of_bucket_list / 1000000000 as bl_db_gb from `crypto-stellar.crypto_stellar.history_ledgers` - where closed_at >= current_timestamp() - interval 1 hour + where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR ) -- alert when the bucketlist has grown larger than 12 gb and total_byte_size_of_bucket_list / 1000000000 >= 12 ) -select -if ( - (select count(*) - from bucketlist_db_size - ) = 0, - --Return when True: - 'Bucketlist is under 12GB', - -- Return Alert when False: - ERROR('BucketlistDB size has exceeded 12GB!')) - ; - select true; +select * from bucketlist_db_size; diff --git a/tests/sorobon_surge_pricing_check.sql b/tests/sorobon_surge_pricing_check.sql index 2b83a98..8eabff7 100644 --- a/tests/sorobon_surge_pricing_check.sql +++ b/tests/sorobon_surge_pricing_check.sql @@ -9,19 +9,9 @@ with surge_pricing_check as ( ledger_sequence, closed_at from `crypto-stellar.crypto_stellar_dbt.enriched_history_operations_soroban` -where closed_at >= current_timestamp - interval 1 hour +where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR ) -- inclusion fees over 100 stroops indicate surge pricing on the network and inclusion_fee_charged > 100 ) -select -if ( - (select count(*) - from surge_pricing_check - ) = 0, - --Return when True: - 'Network is in normal pricing', - -- Return Alert when False: - ERROR('Network has entered surge pricing for Soroban')) - ; - select true; +select * from surge_pricing_check; From acfc893b5709802a703c12aade4bf48fb5512e51 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Sep 2024 13:26:18 +0530 Subject: [PATCH 4/9] remove trailing whitespace --- tests/bucketlist_db_size_check.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/bucketlist_db_size_check.sql b/tests/bucketlist_db_size_check.sql index 4aa25f1..06155d8 100644 --- a/tests/bucketlist_db_size_check.sql +++ b/tests/bucketlist_db_size_check.sql @@ -11,7 +11,7 @@ with bucketlist_db_size as ( from `crypto-stellar.crypto_stellar.history_ledgers` where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR ) -- alert when the bucketlist has grown larger than 12 gb - and total_byte_size_of_bucket_list / 1000000000 >= 12 + and total_byte_size_of_bucket_list / 1000000000 >= 12 ) select * from bucketlist_db_size; From c62a9476c5b9f5332185d442a1da8e5f33907787 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Sep 2024 14:31:03 +0530 Subject: [PATCH 5/9] Move adhoc business queries lint --- tests/bucketlist_db_size_check.sql | 2 +- tests/eho_by_ops.sql | 36 +++++++++++++++ tests/num_txns_and_ops.sql | 65 +++++++++++++++++++++++++++ tests/sorobon_surge_pricing_check.sql | 2 +- 4 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 tests/eho_by_ops.sql create mode 100644 tests/num_txns_and_ops.sql diff --git a/tests/bucketlist_db_size_check.sql b/tests/bucketlist_db_size_check.sql index 06155d8..a16e42f 100644 --- a/tests/bucketlist_db_size_check.sql +++ b/tests/bucketlist_db_size_check.sql @@ -8,7 +8,7 @@ with bucketlist_db_size as ( select sequence, closed_at, total_byte_size_of_bucket_list / 1000000000 as bl_db_gb - from `crypto-stellar.crypto_stellar.history_ledgers` + from {{ source('crypto_stellar', 'history_ledgers') }} where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR ) -- alert when the bucketlist has grown larger than 12 gb and total_byte_size_of_bucket_list / 1000000000 >= 12 diff --git a/tests/eho_by_ops.sql b/tests/eho_by_ops.sql new file mode 100644 index 0000000..efdf2dc --- /dev/null +++ b/tests/eho_by_ops.sql @@ -0,0 +1,36 @@ +{{ config( + severity="error" + , tags=["singular_test"] + ) +}} + +-- Enriched_history_operations table is dependent on the +-- history_operations table to load. It is assumed that +-- any id present in the upstream table should be loaded in +-- the downstream. If records are not present, alert the team. +WITH find_missing AS ( + SELECT op.id, + op.batch_run_date, + op.batch_id + FROM {{ source('crypto_stellar', 'history_operations') }} op + LEFT OUTER JOIN {{ source('crypto_stellar', 'enriched_history_operations') }} eho + ON op.id = eho.op_id + WHERE eho.op_id IS NULL + -- Scan only the last 24 hours of data. Alert runs intraday so failures + -- are caught and resolved quickly. + AND TIMESTAMP(op.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) +), +find_max_batch AS ( + SELECT MAX(batch_run_date) AS max_batch + FROM {{ source('crypto_stellar', 'history_operations') }} + WHERE TIMESTAMP(batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) +) +SELECT batch_run_date, + batch_id, + count(*) +FROM find_missing +-- Account for delay in loading history_operations table prior to +-- enriched_history_operations table being loaded. +WHERE batch_run_date != (SELECT max_batch FROM find_max_batch) +GROUP BY 1, 2 +ORDER BY 1 diff --git a/tests/num_txns_and_ops.sql b/tests/num_txns_and_ops.sql new file mode 100644 index 0000000..6aa6af8 --- /dev/null +++ b/tests/num_txns_and_ops.sql @@ -0,0 +1,65 @@ +{{ config( + severity="error" + , tags=["singular_test"] + ) +}} + +-- Query studies the number of reported transactions and operations +-- reported and committed per ledger in history_ledgers with the +-- actual transaction count and operation count in the ledger. +-- If the counts mismatch, there was a batch processing error +-- and transactions or operations were dropped from the dataset. +-- Get the actual count of transactions per ledger +WITH txn_count AS ( + SELECT ledger_sequence, COUNT(id) as txn_transaction_count + FROM {{ source('crypto_stellar', 'history_transactions') }} + --Take all ledgers committed in the last 36 hours to validate newly written data + -- Alert runs at 12pm UTC in GCP which creates the 36 hour interval + WHERE TIMESTAMP(batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + GROUP BY ledger_sequence +), +-- Get the actual count of operations per ledger + operation_count AS ( + SELECT A.ledger_sequence, COUNT(B.id) AS op_operation_count + FROM {{ source('crypto_stellar', 'history_transactions') }} A + JOIN {{ source('crypto_stellar', 'history_operations') }} B + ON A.id = B.transaction_id + WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + AND TIMESTAMP(B.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + GROUP BY A.ledger_sequence + ), +-- compare actual counts with the counts reported in the ledgers table + final_counts AS ( + SELECT A.sequence, A.closed_at, A.batch_id, + A.tx_set_operation_count as expected_operation_count, + A.operation_count, + (A.failed_transaction_count + A.successful_transaction_count) as expected_transaction_count, + COALESCE(B.txn_transaction_count, 0) as actual_transaction_count, + COALESCE(C.op_operation_count, 0) as actual_operation_count + FROM {{ source('crypto_stellar', 'history_ledgers') }} A + LEFT OUTER JOIN txn_count B + ON A.sequence = B.ledger_sequence + LEFT OUTER JOIN operation_count C + ON A.sequence = C.ledger_sequence + WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + ) + , raw_values AS ( + SELECT sequence, closed_at, batch_id, + expected_transaction_count, actual_transaction_count, + expected_operation_count, actual_operation_count + FROM final_counts + WHERE + ((expected_transaction_count <> actual_transaction_count) + OR (expected_operation_count <> actual_operation_count)) +) +SELECT batch_id, + SUM(expected_transaction_count) as exp_txn_count, + SUM(actual_transaction_count ) as actual_txn_count, + SUM(expected_operation_count ) as exp_op_count, + SUM(actual_operation_count ) as actual_op_count +FROM raw_values +--@TODO: figure out a more precise delay for ledgers. Since tables are loaded on a 15-30 min delay, +-- we do not want a premature alert to row count mismatches when it could be loading latency +WHERE closed_at <= TIMESTAMP_ADD('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL -180 MINUTE ) +GROUP BY batch_id +ORDER BY batch_id diff --git a/tests/sorobon_surge_pricing_check.sql b/tests/sorobon_surge_pricing_check.sql index 8eabff7..ffc0785 100644 --- a/tests/sorobon_surge_pricing_check.sql +++ b/tests/sorobon_surge_pricing_check.sql @@ -8,7 +8,7 @@ with surge_pricing_check as ( select inclusion_fee_charged, ledger_sequence, closed_at -from `crypto-stellar.crypto_stellar_dbt.enriched_history_operations_soroban` +from {{ ref('enriched_history_operations_soroban') }} where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR ) -- inclusion fees over 100 stroops indicate surge pricing on the network and inclusion_fee_charged > 100 From a86ea706fd15a548eb7f490e934a4a3a821db0b8 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Sep 2024 20:38:37 +0530 Subject: [PATCH 6/9] remove semicolon --- tests/bucketlist_db_size_check.sql | 2 +- tests/sorobon_surge_pricing_check.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/bucketlist_db_size_check.sql b/tests/bucketlist_db_size_check.sql index a16e42f..568213f 100644 --- a/tests/bucketlist_db_size_check.sql +++ b/tests/bucketlist_db_size_check.sql @@ -14,4 +14,4 @@ with bucketlist_db_size as ( and total_byte_size_of_bucket_list / 1000000000 >= 12 ) -select * from bucketlist_db_size; +select * from bucketlist_db_size diff --git a/tests/sorobon_surge_pricing_check.sql b/tests/sorobon_surge_pricing_check.sql index ffc0785..6374b9b 100644 --- a/tests/sorobon_surge_pricing_check.sql +++ b/tests/sorobon_surge_pricing_check.sql @@ -14,4 +14,4 @@ where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', I and inclusion_fee_charged > 100 ) -select * from surge_pricing_check; +select * from surge_pricing_check From 228f0dc7e598ab9525dcc2d7b937ccdcb7ec8f0e Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Sep 2024 23:16:22 +0530 Subject: [PATCH 7/9] Fix the reference --- tests/eho_by_ops.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/eho_by_ops.sql b/tests/eho_by_ops.sql index efdf2dc..8d33947 100644 --- a/tests/eho_by_ops.sql +++ b/tests/eho_by_ops.sql @@ -13,7 +13,7 @@ WITH find_missing AS ( op.batch_run_date, op.batch_id FROM {{ source('crypto_stellar', 'history_operations') }} op - LEFT OUTER JOIN {{ source('crypto_stellar', 'enriched_history_operations') }} eho + LEFT OUTER JOIN {{ ref('enriched_history_operations') }} eho ON op.id = eho.op_id WHERE eho.op_id IS NULL -- Scan only the last 24 hours of data. Alert runs intraday so failures From 6bd6abf411f043b3e30af1bfa1780741b9d9cac1 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Thu, 12 Sep 2024 16:02:32 +0530 Subject: [PATCH 8/9] feedback --- models/marts/history_assets.yml | 1 - models/sources/src_accounts.yml | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/models/marts/history_assets.yml b/models/marts/history_assets.yml index 73522ac..0050a26 100644 --- a/models/marts/history_assets.yml +++ b/models/marts/history_assets.yml @@ -14,7 +14,6 @@ models: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: combination_of_columns: - - batch_run_date - asset_type - asset_code - asset_issuer diff --git a/models/sources/src_accounts.yml b/models/sources/src_accounts.yml index 324ca5c..3d89ba0 100644 --- a/models/sources/src_accounts.yml +++ b/models/sources/src_accounts.yml @@ -14,6 +14,7 @@ sources: - incremental_unique_combination_of_columns: combination_of_columns: - account_id + - sequence_number - ledger_entry_change - last_modified_ledger date_column_name: "batch_run_date" From 74dad5a47aa2b7f60aed1239f356c5697d7ba1e3 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Thu, 12 Sep 2024 23:23:00 +0530 Subject: [PATCH 9/9] Use staging tables in test instead of source to handle test env --- tests/bucketlist_db_size_check.sql | 2 +- tests/eho_by_ops.sql | 4 ++-- tests/ledger_sequence_increment.sql | 2 +- tests/num_txns_and_ops.sql | 8 ++++---- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/bucketlist_db_size_check.sql b/tests/bucketlist_db_size_check.sql index 568213f..a1d869b 100644 --- a/tests/bucketlist_db_size_check.sql +++ b/tests/bucketlist_db_size_check.sql @@ -8,7 +8,7 @@ with bucketlist_db_size as ( select sequence, closed_at, total_byte_size_of_bucket_list / 1000000000 as bl_db_gb - from {{ source('crypto_stellar', 'history_ledgers') }} + from {{ ref('stg_history_ledgers') }} where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR ) -- alert when the bucketlist has grown larger than 12 gb and total_byte_size_of_bucket_list / 1000000000 >= 12 diff --git a/tests/eho_by_ops.sql b/tests/eho_by_ops.sql index 8d33947..6691f4b 100644 --- a/tests/eho_by_ops.sql +++ b/tests/eho_by_ops.sql @@ -12,7 +12,7 @@ WITH find_missing AS ( SELECT op.id, op.batch_run_date, op.batch_id - FROM {{ source('crypto_stellar', 'history_operations') }} op + FROM {{ ref('stg_history_operations') }} op LEFT OUTER JOIN {{ ref('enriched_history_operations') }} eho ON op.id = eho.op_id WHERE eho.op_id IS NULL @@ -22,7 +22,7 @@ WITH find_missing AS ( ), find_max_batch AS ( SELECT MAX(batch_run_date) AS max_batch - FROM {{ source('crypto_stellar', 'history_operations') }} + FROM {{ ref('stg_history_operations') }} WHERE TIMESTAMP(batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) ) SELECT batch_run_date, diff --git a/tests/ledger_sequence_increment.sql b/tests/ledger_sequence_increment.sql index d1ce9e9..59c72ef 100644 --- a/tests/ledger_sequence_increment.sql +++ b/tests/ledger_sequence_increment.sql @@ -11,7 +11,7 @@ with , batch_id , closed_at , max(sequence) as max_sequence - from {{ source('crypto_stellar', 'history_ledgers') }} + from {{ ref('stg_history_ledgers') }} where closed_at > TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 7 DAY ) group by id, batch_id, closed_at ) diff --git a/tests/num_txns_and_ops.sql b/tests/num_txns_and_ops.sql index 6aa6af8..06d161f 100644 --- a/tests/num_txns_and_ops.sql +++ b/tests/num_txns_and_ops.sql @@ -12,7 +12,7 @@ -- Get the actual count of transactions per ledger WITH txn_count AS ( SELECT ledger_sequence, COUNT(id) as txn_transaction_count - FROM {{ source('crypto_stellar', 'history_transactions') }} + FROM {{ ref('stg_history_transactions') }} --Take all ledgers committed in the last 36 hours to validate newly written data -- Alert runs at 12pm UTC in GCP which creates the 36 hour interval WHERE TIMESTAMP(batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) @@ -21,8 +21,8 @@ WITH txn_count AS ( -- Get the actual count of operations per ledger operation_count AS ( SELECT A.ledger_sequence, COUNT(B.id) AS op_operation_count - FROM {{ source('crypto_stellar', 'history_transactions') }} A - JOIN {{ source('crypto_stellar', 'history_operations') }} B + FROM {{ ref('stg_history_transactions') }} A + JOIN {{ ref('stg_history_operations') }} B ON A.id = B.transaction_id WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) AND TIMESTAMP(B.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) @@ -36,7 +36,7 @@ WITH txn_count AS ( (A.failed_transaction_count + A.successful_transaction_count) as expected_transaction_count, COALESCE(B.txn_transaction_count, 0) as actual_transaction_count, COALESCE(C.op_operation_count, 0) as actual_operation_count - FROM {{ source('crypto_stellar', 'history_ledgers') }} A + FROM {{ ref('stg_history_ledgers') }} A LEFT OUTER JOIN txn_count B ON A.sequence = B.ledger_sequence LEFT OUTER JOIN operation_count C