forked from GoogleCloudPlatform/python-docs-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding sample python DAG and gcs_to_gcs operator as plugin (GoogleClo…
…udPlatform#1678) * Add gcs_to_gcs operator from current Airflow master as a plugin, this is because is not currently available in current Composer Airflow version (1.9.0) * Add gcs_to_gcs operator from current Airflow master as a plugin, this is because is not currently available in current Composer Airflow version (1.9.0) * Author DAG, including master csv file sample and test: 1. Uses gcs_to_gcs operator from plugins 2. Uses Cloud logging features 3. Dynamically generates tasks based on master csv file * Fix PEP8 warnings * Change sample tables to copy, using NYC Taxi trips * - Replace ":" with valid character for Airflow task - Enable export and import multiple Avro files * The GCS hook must be downloaded from the Airflow repository * Rename config file to "table_list_file_path" * Remove unnecessary logging code * Refactor master to table_file in code and filename * Add gcs_to_gcs module from Airflow 1.10 as third party before adapting for Composer DAG as plugin * Wrap lines in gcs_to_gcs module from Airflow 1.10 and import hook from plugins * Add notes to install module * Remove gcs_to_gcs module after being moved to third_party folder * Refactor test and include actual sample table list file * Add license and instructions in third party folder for hooks and operators * Fix tests and update CSV parsing for Python 3 compat. I moved the plugins directory to third-party to avoid nox from running the linter and tests on Apache Airflow code.
- Loading branch information
Showing
14 changed files
with
1,274 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
# Copyright 2018 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Example Airflow DAG that performs an export from BQ tables listed in | ||
config file to GCS, copies GCS objects across locations (e.g., from US to | ||
EU) then imports from GCS to BQ. The DAG imports the gcs_to_gcs operator | ||
from plugins and dynamically builds the tasks based on the list of tables. | ||
Lastly, the DAG defines a specific application logger to generate logs. | ||
This DAG relies on three Airflow variables | ||
(https://airflow.apache.org/concepts.html#variables): | ||
* table_list_file_path - CSV file listing source and target tables, including | ||
Datasets. | ||
* gcs_source_bucket - Google Cloud Storage bucket to use for exporting | ||
BigQuery tables in source. | ||
* gcs_dest_bucket - Google Cloud Storage bucket to use for importing | ||
BigQuery tables in destination. | ||
See https://cloud.google.com/storage/docs/creating-buckets for creating a | ||
bucket. | ||
""" | ||
|
||
# -------------------------------------------------------------------------------- | ||
# Load The Dependencies | ||
# -------------------------------------------------------------------------------- | ||
|
||
import csv | ||
import datetime | ||
import io | ||
import logging | ||
|
||
from airflow import models | ||
from airflow.contrib.operators import bigquery_to_gcs | ||
from airflow.contrib.operators import gcs_to_bq | ||
from airflow.operators import dummy_operator | ||
# Import operator from plugins | ||
from gcs_plugin.operators import gcs_to_gcs | ||
|
||
|
||
# -------------------------------------------------------------------------------- | ||
# Set default arguments | ||
# -------------------------------------------------------------------------------- | ||
|
||
default_args = { | ||
'owner': 'airflow', | ||
'start_date': datetime.datetime.today(), | ||
'depends_on_past': False, | ||
'email': [''], | ||
'email_on_failure': False, | ||
'email_on_retry': False, | ||
'retries': 1, | ||
'retry_delay': datetime.timedelta(minutes=5), | ||
} | ||
|
||
# -------------------------------------------------------------------------------- | ||
# Set variables | ||
# -------------------------------------------------------------------------------- | ||
|
||
# 'table_list_file_path': This variable will contain the location of the master | ||
# file. | ||
table_list_file_path = models.Variable.get('table_list_file_path') | ||
|
||
# Source Bucket | ||
source_bucket = models.Variable.get('gcs_source_bucket') | ||
|
||
# Destination Bucket | ||
dest_bucket = models.Variable.get('gcs_dest_bucket') | ||
|
||
# -------------------------------------------------------------------------------- | ||
# Set GCP logging | ||
# -------------------------------------------------------------------------------- | ||
|
||
logger = logging.getLogger('bq_copy_us_to_eu_01') | ||
|
||
# -------------------------------------------------------------------------------- | ||
# Functions | ||
# -------------------------------------------------------------------------------- | ||
|
||
|
||
def read_table_list(table_list_file): | ||
""" | ||
Reads the table list file that will help in creating Airflow tasks in | ||
the DAG dynamically. | ||
:param table_list_file: (String) The file location of the table list file, | ||
e.g. '/home/airflow/framework/table_list.csv' | ||
:return table_list: (List) List of tuples containing the source and | ||
target tables. | ||
""" | ||
table_list = [] | ||
logger.info('Reading table_list_file from : %s' % str(table_list_file)) | ||
try: | ||
with io.open(table_list_file, 'rt', encoding='utf-8') as csv_file: | ||
csv_reader = csv.reader(csv_file) | ||
next(csv_reader) # skip the headers | ||
for row in csv_reader: | ||
logger.info(row) | ||
table_tuple = { | ||
'table_source': row[0], | ||
'table_dest': row[1] | ||
} | ||
table_list.append(table_tuple) | ||
return table_list | ||
except IOError as e: | ||
logger.error('Error opening table_list_file %s: ' % str( | ||
table_list_file), e) | ||
|
||
|
||
# -------------------------------------------------------------------------------- | ||
# Main DAG | ||
# -------------------------------------------------------------------------------- | ||
|
||
# Define a DAG (directed acyclic graph) of tasks. | ||
# Any task you create within the context manager is automatically added to the | ||
# DAG object. | ||
with models.DAG('bq_copy_us_to_eu_01', | ||
default_args=default_args, | ||
schedule_interval=None) as dag: | ||
start = dummy_operator.DummyOperator( | ||
task_id='start', | ||
trigger_rule='all_success' | ||
) | ||
|
||
end = dummy_operator.DummyOperator( | ||
task_id='end', | ||
|
||
trigger_rule='all_success' | ||
) | ||
|
||
# Get the table list from master file | ||
all_records = read_table_list(table_list_file_path) | ||
|
||
# Loop over each record in the 'all_records' python list to build up | ||
# Airflow tasks | ||
for record in all_records: | ||
logger.info('Generating tasks to transfer table: {}'.format(record)) | ||
|
||
table_source = record['table_source'] | ||
table_dest = record['table_dest'] | ||
|
||
BQ_to_GCS = bigquery_to_gcs.BigQueryToCloudStorageOperator( | ||
# Replace ":" with valid character for Airflow task | ||
task_id='{}_BQ_to_GCS'.format(table_source.replace(":", "_")), | ||
source_project_dataset_table=table_source, | ||
destination_cloud_storage_uris=['{}-*.avro'.format( | ||
'gs://' + source_bucket + '/' + table_source)], | ||
export_format='AVRO' | ||
) | ||
|
||
GCS_to_GCS = gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator( | ||
# Replace ":" with valid character for Airflow task | ||
task_id='{}_GCS_to_GCS'.format(table_source.replace(":", "_")), | ||
source_bucket=source_bucket, | ||
source_object='{}-*.avro'.format(table_source), | ||
destination_bucket=dest_bucket, | ||
# destination_object='{}-*.avro'.format(table_dest) | ||
) | ||
|
||
GCS_to_BQ = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( | ||
# Replace ":" with valid character for Airflow task | ||
task_id='{}_GCS_to_BQ'.format(table_dest.replace(":", "_")), | ||
bucket=dest_bucket, | ||
source_objects=['{}-*.avro'.format(table_source)], | ||
destination_project_dataset_table=table_dest, | ||
source_format='AVRO', | ||
write_disposition='WRITE_TRUNCATE' | ||
) | ||
|
||
start >> BQ_to_GCS >> GCS_to_GCS >> GCS_to_BQ >> end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
# Copyright 2018 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import os | ||
import os.path | ||
import sys | ||
|
||
from airflow import models | ||
import pytest | ||
|
||
|
||
@pytest.fixture(scope='module', autouse=True) | ||
def gcs_plugin(): | ||
plugins_dir = os.path.abspath(os.path.join( | ||
os.path.dirname(__file__), | ||
'..', | ||
'..', | ||
'third_party', | ||
'apache-airflow', | ||
'plugins', | ||
)) | ||
sys.path.append(plugins_dir) | ||
yield | ||
sys.path.remove(plugins_dir) | ||
|
||
|
||
def test_dag_import(): | ||
"""Test that the DAG file can be successfully imported. | ||
This tests that the DAG can be parsed, but does not run it in an Airflow | ||
environment. This is a recommended sanity check by the official Airflow | ||
docs: https://airflow.incubator.apache.org/tutorial.html#testing | ||
""" | ||
example_file_path = os.path.join( | ||
os.path.abspath(os.path.dirname(__file__)), | ||
'bq_copy_eu_to_us_sample.csv') | ||
models.Variable.set('table_list_file_path', example_file_path) | ||
models.Variable.set('gcs_source_bucket', 'example-project') | ||
models.Variable.set('gcs_dest_bucket', 'us-central1-f') | ||
from . import bq_copy_across_locations # noqa |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
Source, Target | ||
nyc-tlc:green.trips_2014,nyc_tlc_EU.trips_2014 | ||
nyc-tlc:green.trips_2015,nyc_tlc_EU.trips_2015 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
apache-airflow[gcp_api]==1.10.0 | ||
kubernetes==7.0.0 | ||
scipy==1.1.0 | ||
numpy==1.15.1 | ||
numpy==1.15.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.