-
Notifications
You must be signed in to change notification settings - Fork 105
/
Copy pathrow_count_within_past_partitions_avg.jinja
52 lines (46 loc) · 1.67 KB
/
row_count_within_past_partitions_avg.jinja
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{% macro row_count_within_past_partitions_avg(number_of_days, threshold_percentage, partition_field) %}
{% set partition_field = partition_field | default("submission_date") %}
WITH rows_per_partition AS (
SELECT
PARSE_DATE("%Y%m%d", partition_id) AS table_partition,
total_rows
FROM
`{{ project_id }}.{{ dataset_id }}.INFORMATION_SCHEMA.PARTITIONS`
WHERE
table_name = "{{ table_name }}"
AND partition_id != "__NULL__"
AND PARSE_DATE("%Y%m%d", partition_id)
BETWEEN DATE_SUB(@{{ partition_field }}, INTERVAL {{ number_of_days }} + 1 DAY)
AND DATE(@{{ partition_field }})
),
row_counts_current_and_historic AS (
SELECT
SUM(IF(table_partition = @{{ partition_field }}, total_rows, NULL)) AS current_partition_row_count,
AVG(IF(table_partition < @{{ partition_field }}, total_rows, NULL)) AS historic_partition_avg_row_count,
FROM rows_per_partition
),
row_count_boundaries AS (
SELECT
CAST(current_partition_row_count AS INT64) AS current_partition_row_count,
CAST(historic_partition_avg_row_count * (1 - {{ threshold_percentage }} / 100) AS INT64) AS lower_bound,
CAST(historic_partition_avg_row_count * (1 + {{ threshold_percentage }} / 100) AS INT64) AS upper_bound
FROM row_counts_current_and_historic
)
SELECT
IF(
current_partition_row_count NOT BETWEEN lower_bound AND upper_bound,
ERROR(
CONCAT(
"The row count for partition ", @{{ partition_field }}, " is outside of the expected boundaries. ",
"Row count for the current_partition: ",
current_partition_row_count,
". Expected range: ",
lower_bound,
" - ",
upper_bound
)
),
NULL
)
FROM row_count_boundaries;
{% endmacro %}