-
Notifications
You must be signed in to change notification settings - Fork 170
/
gcs_2_bq_dag.py
84 lines (72 loc) · 2.89 KB
/
gcs_2_bq_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
import logging
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator, BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
BUCKET = os.environ.get("GCP_GCS_BUCKET")
path_to_local_home = os.environ.get("AIRFLOW_HOME", "/opt/airflow/")
BIGQUERY_DATASET = os.environ.get("BIGQUERY_DATASET", 'trips_data_all')
DATASET = "tripdata"
TAXI_TYPES = {'yellow': 'tpep_pickup_datetime', 'fhv': 'Pickup_datetime', 'green': 'lpep_pickup_datetime'}
#TAXI_TYPES = {'yellow': 'tpep_pickup_datetime'}
INPUT_PART = "raw"
#INPUT_FILETYPE = "parquet"
default_args = {
"owner": "airflow",
"start_date": days_ago(1),
"depends_on_past": False,
"retries": 1,
}
# NOTE: DAG declaration - using a Context Manager (an implicit way)
with DAG(
dag_id="gcs_2_bq_dag",
schedule_interval="@daily",
default_args=default_args,
catchup=False,
max_active_runs=1,
tags=['dtc-de'],
) as dag:
for taxi_type, ds_col in TAXI_TYPES.items():
gcs_2_gcs_task = GCSToGCSOperator(
task_id=f'move_{taxi_type}_{DATASET}_files_task',
source_bucket=BUCKET,
#source_object=f'{INPUT_PART}/{taxi_type}_*.{INPUT_FILETYPE}',
source_object=f'{INPUT_PART}/{taxi_type}_*',
destination_bucket=BUCKET,
#destination_object=f'{taxi_type}/{taxi_type}_{DATASET}*.{INPUT_FILETYPE}',
destination_object=f'{taxi_type}/{taxi_type}_',
move_object=False
)
gcs_2_bq_ext_task = BigQueryCreateExternalTableOperator(
task_id=f"bq_{taxi_type}_{DATASET}_external_table_task",
table_resource={
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": BIGQUERY_DATASET,
"tableId": f"{taxi_type}_{DATASET}_external_table",
},
"externalDataConfiguration": {
"autodetect": "True",
"sourceFormat": "PARQUET",
"sourceUris": [f"gs://{BUCKET}/{taxi_type}/*"],
},
},
)
CREATE_BQ_TBL_QUERY = (
f"CREATE OR REPLACE TABLE {BIGQUERY_DATASET}.{taxi_type}_{DATASET} \
PARTITION BY DATE({ds_col}) \
AS \
SELECT * FROM {BIGQUERY_DATASET}.{taxi_type}_{DATASET}_external_table;"
)
bq_ext_2_part_task = BigQueryInsertJobOperator(
task_id=f"bq_create_{taxi_type}_{DATASET}_partitioned_table_task",
configuration={
"query": {
"query": CREATE_BQ_TBL_QUERY,
"useLegacySql": False,
}
}
)
gcs_2_gcs_task >> gcs_2_bq_ext_task >> bq_ext_2_part_task