forked from Greenstand/treetracker-airflow-dags
-
Notifications
You must be signed in to change notification settings - Fork 0
/
populate-reports-schema.py
137 lines (124 loc) · 5.18 KB
/
populate-reports-schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from datetime import datetime, timedelta
from textwrap import dedent
from pprint import pprint
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import psycopg2.extras
from lib.utils import on_failure_callback
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['x6i4h0c1i4v9l5t6@greenstand.slack.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
'on_failure_callback': on_failure_callback, # needs to be set in default_args to work correctly: https://github.com/apache/airflow/issues/26760
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'populate-reporting-schema',
default_args=default_args,
description='Populate the reporting schema',
schedule_interval= "@daily",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['reporting'],
) as dag:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
postgresConnId = "postgres_default"
def populate_reporting_schema(ds, **kwargs):
db = PostgresHook(postgres_conn_id=postgresConnId)
conn = db.get_conn()
updateCursor = conn.cursor()
cursor = conn.cursor('capture_reporting_cursor', cursor_factory=psycopg2.extras.RealDictCursor)
try:
cursor.execute("""
SELECT
trees.uuid AS capture_uuid,
planter.first_name AS planter_first_name,
planter.last_name AS planter_last_name,
COALESCE(planter.phone, planter.email) AS planter_identifier,
planter.gender AS gender,
trees.time_created AS capture_created_at,
trees.note AS note,
trees.lat AS lat,
trees.lon AS lon,
trees.approved AS approved,
planting_organization.stakeholder_uuid AS planting_organization_uuid,
planting_organization.name AS planting_organization_name,
tree_species.name AS species,
region.name as catchment
FROM trees
JOIN planter
ON planter.id = trees.planter_id
LEFT JOIN entity AS planting_organization
ON planting_organization.id = planter.organization_id
LEFT JOIN tree_species
ON trees.species_id = tree_species.id
LEFT JOIN (
SELECT region.name, region.geom
FROM region
JOIN region_type
ON region_type.id = region.type_id
WHERE region_type.type = 'fcc_catchments'
) region
ON ST_WITHIN(trees.estimated_geometric_location, region.geom)
WHERE trees.active = true
AND (planter_identifier IS NOT NULL or planter_id is not null)
;
""");
print("SQL result:", cursor.query)
updateCursor.execute("""
DELETE FROM reporting.capture_denormalized
""")
for row in cursor:
#do something with every single row here
#optionally print the row
# print(row)
updateCursor.execute("""
INSERT INTO reporting.capture_denormalized
(capture_uuid, planter_first_name, planter_last_name, planter_identifier,
capture_created_at, lat, lon, note, approved,
planting_organization_uuid, planting_organization_name,
species, catchment, gender )
values
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING *
""", (
row['capture_uuid'], row['planter_first_name'], row['planter_last_name'], row['planter_identifier'],
row['capture_created_at'], row['lat'], row['lon'], row['note'], row['approved'],
row['planting_organization_uuid'], row['planting_organization_name'],
row['species'],
row['catchment'], row['gender']
) );
conn.commit()
return 0
except Exception as e:
print("get error when exec SQL:", e)
print("SQL result:", updateCursor.query)
raise ValueError('Error executing query')
populate_reporting_schema = PythonOperator(
task_id='populate_reporting_schema',
python_callable=populate_reporting_schema,
)
populate_reporting_schema >> t1