From 70bf307f3894214c523701940b89ac0b991a3a63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobiasz=20K=C4=99dzierski?= Date: Thu, 21 Jan 2021 11:41:36 +0100 Subject: [PATCH] Add How To Guide for Dataflow (#13461) --- .../cloud/example_dags/example_dataflow.py | 18 +- .../example_dataflow_flex_template.py | 2 + .../example_dags/example_dataflow_sql.py | 2 + .../google/cloud/operators/dataflow.py | 24 ++ .../google/cloud/sensors/dataflow.py | 16 + airflow/providers/google/provider.yaml | 2 + .../operators/cloud/dataflow.rst | 296 ++++++++++++++++++ 7 files changed, 358 insertions(+), 2 deletions(-) create mode 100644 docs/apache-airflow-providers-google/operators/cloud/dataflow.rst diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow.py b/airflow/providers/google/cloud/example_dags/example_dataflow.py index 22b36759f5d3f..a6315bb5cd86f 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataflow.py +++ b/airflow/providers/google/cloud/example_dags/example_dataflow.py @@ -65,7 +65,7 @@ tags=['example'], ) as dag_native_java: - # [START howto_operator_start_java_job] + # [START howto_operator_start_java_job_jar_on_gcs] start_java_job = DataflowCreateJavaJobOperator( task_id="start-java-job", jar=GCS_JAR, @@ -78,8 +78,9 @@ check_if_running=CheckJobRunning.IgnoreJob, location='europe-west3', ) - # [END howto_operator_start_java_job] + # [END howto_operator_start_java_job_jar_on_gcs] + # [START howto_operator_start_java_job_local_jar] jar_to_local = GCSToLocalFilesystemOperator( task_id="jar-to-local", bucket=GCS_JAR_BUCKET_NAME, @@ -99,6 +100,7 @@ check_if_running=CheckJobRunning.WaitForRun, ) jar_to_local >> start_java_job_local + # [END howto_operator_start_java_job_local_jar] with models.DAG( "example_gcp_dataflow_native_python", @@ -144,6 +146,7 @@ schedule_interval=None, # Override to match your needs tags=['example'], ) as dag_native_python_async: + # [START howto_operator_start_python_job_async] start_python_job_async = DataflowCreatePythonJobOperator( task_id="start-python-job-async", py_file=GCS_PYTHON, @@ -158,14 +161,18 @@ location='europe-west3', wait_until_finished=False, ) + # [END howto_operator_start_python_job_async] + # [START howto_sensor_wait_for_job_status] wait_for_python_job_async_done = DataflowJobStatusSensor( task_id="wait-for-python-job-async-done", job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}", expected_statuses={DataflowJobStatus.JOB_STATE_DONE}, location='europe-west3', ) + # [END howto_sensor_wait_for_job_status] + # [START howto_sensor_wait_for_job_metric] def check_metric_scalar_gte(metric_name: str, value: int) -> Callable: """Check is metric greater than equals to given value.""" @@ -187,7 +194,9 @@ def callback(metrics: List[Dict]) -> bool: location='europe-west3', callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100), ) + # [END howto_sensor_wait_for_job_metric] + # [START howto_sensor_wait_for_job_message] def check_message(messages: List[dict]) -> bool: """Check message""" for message in messages: @@ -201,7 +210,9 @@ def check_message(messages: List[dict]) -> bool: location='europe-west3', callback=check_message, ) + # [END howto_sensor_wait_for_job_message] + # [START howto_sensor_wait_for_job_autoscaling_event] def check_autoscaling_event(autoscaling_events: List[dict]) -> bool: """Check autoscaling event""" for autoscaling_event in autoscaling_events: @@ -215,6 +226,7 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool: location='europe-west3', callback=check_autoscaling_event, ) + # [END howto_sensor_wait_for_job_autoscaling_event] start_python_job_async >> wait_for_python_job_async_done start_python_job_async >> wait_for_python_job_async_metric @@ -229,9 +241,11 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool: schedule_interval=None, # Override to match your needs tags=['example'], ) as dag_template: + # [START howto_operator_start_template_job] start_template_job = DataflowTemplatedJobStartOperator( task_id="start-template-job", template='gs://dataflow-templates/latest/Word_Count', parameters={'inputFile': "gs://dataflow-samples/shakespeare/kinglear.txt", 'output': GCS_OUTPUT}, location='europe-west3', ) + # [END howto_operator_start_template_job] diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py b/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py index 98c518106814c..4b4a0d163105c 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py +++ b/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py @@ -44,6 +44,7 @@ start_date=days_ago(1), schedule_interval=None, # Override to match your needs ) as dag_flex_template: + # [START howto_operator_start_template_job] start_flex_template = DataflowStartFlexTemplateOperator( task_id="start_flex_template_streaming_beam_sql", body={ @@ -59,3 +60,4 @@ do_xcom_push=True, location=BQ_FLEX_TEMPLATE_LOCATION, ) + # [END howto_operator_start_template_job] diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py b/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py index b7d3d708e6141..a9ec8bc3ab2d1 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py +++ b/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py @@ -39,6 +39,7 @@ schedule_interval=None, # Override to match your needs tags=['example'], ) as dag_sql: + # [START howto_operator_start_sql_job] start_sql = DataflowStartSqlJobOperator( task_id="start_sql_query", job_name=DATAFLOW_SQL_JOB_NAME, @@ -61,3 +62,4 @@ location=DATAFLOW_SQL_LOCATION, do_xcom_push=True, ) + # [END howto_operator_start_sql_job] diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py index 49863dcf4bce5..d92920e893d0e 100644 --- a/airflow/providers/google/cloud/operators/dataflow.py +++ b/airflow/providers/google/cloud/operators/dataflow.py @@ -87,6 +87,10 @@ class DataflowCreateJavaJobOperator(BaseOperator): For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataflowCreateJavaJobOperator` + :param jar: The reference to a self executing DataFlow jar (templated). :type jar: str :param job_name: The 'jobName' to use when executing the DataFlow job @@ -321,6 +325,10 @@ class DataflowTemplatedJobStartOperator(BaseOperator): Start a Templated Cloud DataFlow job. The parameters of the operation will be passed to the job. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataflowTemplatedJobStartOperator` + :param template: The reference to the DataFlow template. :type template: str :param job_name: The 'jobName' to use when executing the DataFlow template @@ -543,6 +551,10 @@ class DataflowStartFlexTemplateOperator(BaseOperator): """ Starts flex templates with the Dataflow pipeline. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataflowStartFlexTemplateOperator` + :param body: The request body. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body :param location: The location of the Dataflow job (for example europe-west1) @@ -659,6 +671,14 @@ class DataflowStartSqlJobOperator(BaseOperator): """ Starts Dataflow SQL query. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataflowStartSqlJobOperator` + + .. warning:: + This operator requires ``gcloud`` command (Google Cloud SDK) must be installed on the Airflow worker + `__ + :param job_name: The unique name to assign to the Cloud Dataflow job. :type job_name: str :param query: The SQL query to execute. @@ -764,6 +784,10 @@ class DataflowCreatePythonJobOperator(BaseOperator): For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataflowCreatePythonJobOperator` + :param py_file: Reference to the python dataflow pipeline file.py, e.g., /some/local/file/path/to/your/python/pipeline/file. (templated) :type py_file: str diff --git a/airflow/providers/google/cloud/sensors/dataflow.py b/airflow/providers/google/cloud/sensors/dataflow.py index ffe3580cae3a3..58026e380e255 100644 --- a/airflow/providers/google/cloud/sensors/dataflow.py +++ b/airflow/providers/google/cloud/sensors/dataflow.py @@ -32,6 +32,10 @@ class DataflowJobStatusSensor(BaseSensorOperator): """ Checks for the status of a job in Google Cloud Dataflow. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataflowJobStatusSensor` + :param job_id: ID of the job to be checked. :type job_id: str :param expected_statuses: The expected state of the operation. @@ -122,6 +126,10 @@ class DataflowJobMetricsSensor(BaseSensorOperator): """ Checks the metrics of a job in Google Cloud Dataflow. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataflowJobMetricsSensor` + :param job_id: ID of the job to be checked. :type job_id: str :param callback: callback which is called with list of read job metrics @@ -212,6 +220,10 @@ class DataflowJobMessagesSensor(BaseSensorOperator): """ Checks for the job message in Google Cloud Dataflow. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataflowJobMessagesSensor` + :param job_id: ID of the job to be checked. :type job_id: str :param callback: callback which is called with list of read job metrics @@ -302,6 +314,10 @@ class DataflowJobAutoScalingEventsSensor(BaseSensorOperator): """ Checks for the job autoscaling event in Google Cloud Dataflow. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataflowJobAutoScalingEventsSensor` + :param job_id: ID of the job to be checked. :type job_id: str :param callback: callback which is called with list of read job metrics diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index abf6b23de9411..67b7af4186f4a 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -201,6 +201,8 @@ integrations: tags: [gcp] - integration-name: Google Dataflow external-doc-url: https://cloud.google.com/dataflow/ + how-to-guide: + - /docs/apache-airflow-providers-google/operators/cloud/dataflow.rst logo: /integration-logos/gcp/Cloud-Dataflow.png tags: [gcp] - integration-name: Google Data Fusion diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst b/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst new file mode 100644 index 0000000000000..9f86bb9b8e324 --- /dev/null +++ b/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst @@ -0,0 +1,296 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Google Cloud Dataflow Operators +=============================== + +`Dataflow `__ is a managed service for +executing a wide variety of data processing patterns. These pipelines are created +using the Apache Beam programming model which allows for both batch and streaming processing. + +.. contents:: + :depth: 1 + :local: + +Prerequisite Tasks +^^^^^^^^^^^^^^^^^^ + +.. include::/operators/_partials/prerequisite_tasks.rst + +Ways to run a data pipeline +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +There are several ways to run a Dataflow pipeline depending on your environment, source files: + +- **Non-templated pipeline**: Developer can run the pipeline as a local process on the Airflow worker + if you have a '*.jar' file for Java or a '* .py` file for Python. This also means that the necessary system + dependencies must be installed on the worker. For Java, worker must have the JRE Runtime installed. + For Python, the Python interpreter. The runtime versions must be compatible with the pipeline versions. + This is the fastest way to start a pipeline, but because of its frequent problems with system dependencies, + it may cause problems. See: :ref:`howto/operator:DataflowCreateJavaJobOperator`, + :ref:`howto/operator:DataflowCreatePythonJobOperator` for more detailed information. +- **Templated pipeline**: The programmer can make the pipeline independent of the environment by preparing + a template that will then be run on a machine managed by Google. This way, changes to the environment + won't affect your pipeline. There are two types of the templates: + + - **Classic templates**. Developers run the pipeline and create a template. The Apache Beam SDK stages + files in Cloud Storage, creates a template file (similar to job request), + and saves the template file in Cloud Storage. See: :ref:`howto/operator:DataflowTemplatedJobStartOperator` + - **Flex Templates**. Developers package the pipeline into a Docker image and then use the ``gcloud`` + command-line tool to build and save the Flex Template spec file in Cloud Storage. See: + :ref:`howto/operator:DataflowStartFlexTemplateOperator` + +- **SQL pipeline**: Developer can write pipeline as SQL statement and then execute it in Dataflow. See: + :ref:`howto/operator:DataflowStartSqlJobOperator` + +It is a good idea to test your pipeline using the non-templated pipeline, +and then run the pipeline in production using the templates. + +For details on the differences between the pipeline types, see +`Dataflow templates `__ +in the Google Cloud documentation. + +Starting Non-templated pipeline +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +To create a new pipeline using the source file (JAR in Java or Python file) use +the create job operators. The source file can be located on GCS or on the local filesystem. +:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator` +or +:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator` + +.. _howto/operator:DataflowCreateJavaJobOperator: + +Java SDK pipelines +"""""""""""""""""" + +For Java pipeline the ``jar`` argument must be specified for +:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator` +as it contains the pipeline to be executed on Dataflow. The JAR can be available on GCS that Airflow +has the ability to download or available on the local filesystem (provide the absolute path to it). + +Here is an example of creating and running a pipeline in Java with jar stored on GCS: + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_java_job_jar_on_gcs] + :end-before: [END howto_operator_start_java_job_jar_on_gcs] + +Here is an example of creating and running a pipeline in Java with jar stored on GCS: + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_java_job_local_jar] + :end-before: [END howto_operator_start_java_job_local_jar] + +.. _howto/operator:DataflowCreatePythonJobOperator: + +Python SDK pipelines +"""""""""""""""""""" + +The ``py_file`` argument must be specified for +:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator` +as it contains the pipeline to be executed on Dataflow. The Python file can be available on GCS that Airflow +has the ability to download or available on the local filesystem (provide the absolute path to it). + +The ``py_interpreter`` argument specifies the Python version to be used when executing the pipeline, the default +is ``python3`. If your Airflow instance is running on Python 2 - specify ``python2`` and ensure your ``py_file`` is +in Python 2. For best results, use Python 3. + +If ``py_requirements`` argument is specified a temporary Python virtual environment with specified requirements will be created +and within it pipeline will run. + +The ``py_system_site_packages`` argument specifies whether or not all the Python packages from your Airflow instance, +will be accessible within virtual environment (if ``py_requirements`` argument is specified), +recommend avoiding unless the Dataflow job requires it. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_python_job] + :end-before: [END howto_operator_start_python_job] + +Execution models +^^^^^^^^^^^^^^^^ + +Dataflow has multiple options of executing pipelines. It can be done in the following modes: +batch asynchronously (fire and forget), batch blocking (wait until completion), or streaming (run indefinitely). +In Airflow it is best practice to use asynchronous batch pipelines or streams and use sensors to listen for expected job state. + +By default :class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator`, +:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`, +:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowTemplatedJobStartOperator` and +:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowStartFlexTemplateOperator` +have argument ``wait_until_finished`` set to ``None`` which cause different behaviour depends om the type of pipeline: + +* for the streaming pipeline, wait for jobs to start, +* for the batch pipeline, wait for the jobs to complete. + +If ``wait_until_finished`` is set to ``True`` operator will always wait for end of pipeline execution. +If set to ``False`` only submits the jobs. + +See: `Configuring PipelineOptions for execution on the Cloud Dataflow service `_ + +Asynchronous execution +"""""""""""""""""""""" + +Dataflow batch jobs are by default asynchronous - however this is dependent on the application code (contained in the JAR +or Python file) and how it is written. In order for the Dataflow job to execute asynchronously, ensure the +pipeline objects are not being waited upon (not calling ``waitUntilFinish`` or ``wait_until_finish`` on the +``PipelineResult`` in your application code). + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_python_job_async] + :end-before: [END howto_operator_start_python_job_async] + +Blocking execution +"""""""""""""""""" + +In order for a Dataflow job to execute and wait until completion, ensure the pipeline objects are waited upon +in the application code. This can be done for the Java SDK by calling ``waitUntilFinish`` on the ``PipelineResult`` +returned from ``pipeline.run()`` or for the Python SDK by calling ``wait_until_finish`` on the ``PipelineResult`` +returned from ``pipeline.run()``. + +Blocking jobs should be avoided as there is a background process that occurs when run on Airflow. This process is +continuously being run to wait for the Dataflow job to be completed and increases the consumption of resources by +Airflow in doing so. + +Streaming execution +""""""""""""""""""" + +To execute a streaming Dataflow job, ensure the streaming option is set (for Python) or read from an unbounded data +source, such as Pub/Sub, in your pipeline (for Java). + +Setting argument ``drain_pipeline`` to ``True`` allows to stop streaming job by draining it +instead of canceling during during killing task instance. + +See the `Stopping a running pipeline +`_. + + +.. _howto/operator:DataflowTemplatedJobStartOperator: +.. _howto/operator:DataflowStartFlexTemplateOperator: + +Templated jobs +"""""""""""""" + +Templates give the ability to stage a pipeline on Cloud Storage and run it from there. This +provides flexibility in the development workflow as it separates the development of a pipeline +from the staging and execution steps. There are two types of templates for Dataflow: Classic and Flex. +See the `official documentation for Dataflow templates +`_ for more information. + +Here is an example of running Classic template with +:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowTemplatedJobStartOperator`: + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_template_job] + :end-before: [END howto_operator_start_template_job] + +See the `list of Google-provided templates that can be used with this operator +`_. + +Here is an example of running Flex template with +:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowStartFlexTemplateOperator`: + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_template_job] + :end-before: [END howto_operator_start_template_job] + +.. _howto/operator:DataflowStartSqlJobOperator: + +Dataflow SQL +"""""""""""" +Dataflow SQL supports a variant of the ZetaSQL query syntax and includes additional streaming +extensions for running Dataflow streaming jobs. + +Here is an example of running Dataflow SQL job with +:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowStartSqlJobOperator`: + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow_sql.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_sql_job] + :end-before: [END howto_operator_start_sql_job] + +.. warning:: + This operator requires ``gcloud`` command (Google Cloud SDK) must be installed on the Airflow worker + `__ + +See the `Dataflow SQL reference +`_. + +.. _howto/operator:DataflowJobStatusSensor: +.. _howto/operator:DataflowJobMetricsSensor: +.. _howto/operator:DataflowJobMessagesSensor: +.. _howto/operator:DataflowJobAutoScalingEventsSensor: + +Sensors +^^^^^^^ + +When job is triggered asynchronously sensors may be used to run checks for specific job properties. + +:class:`~airflow.providers.google.cloud.sensors.dataflow.DataflowJobStatusSensor`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow.py + :language: python + :dedent: 4 + :start-after: [START howto_sensor_wait_for_job_status] + :end-before: [END howto_sensor_wait_for_job_status] + +:class:`~airflow.providers.google.cloud.sensors.dataflow.DataflowJobMetricsSensor`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow.py + :language: python + :dedent: 4 + :start-after: [START howto_sensor_wait_for_job_metric] + :end-before: [END howto_sensor_wait_for_job_metric] + +:class:`~airflow.providers.google.cloud.sensors.dataflow.DataflowJobMessagesSensor`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow.py + :language: python + :dedent: 4 + :start-after: [START howto_sensor_wait_for_job_message] + :end-before: [END howto_sensor_wait_for_job_message] + +:class:`~airflow.providers.google.cloud.sensors.dataflow.DataflowJobAutoScalingEventsSensor`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow.py + :language: python + :dedent: 4 + :start-after: [START howto_sensor_wait_for_job_autoscaling_event] + :end-before: [END howto_sensor_wait_for_job_autoscaling_event] + +Reference +^^^^^^^^^ + +For further information, look at: + +* `Google Cloud API Documentation `__ +* `Apache Beam Documentation `__ +* `Product Documentation `__ +* `Dataflow Monitoring Interface `__ +* `Dataflow Command-line Interface `__