diff --git a/UPDATING.md b/UPDATING.md index a2ac8c33ef00d..1e816116cb68d 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -1868,6 +1868,58 @@ https://cloud.google.com/compute/docs/disks/performance Hence, the default value for `master_disk_size` in `DataprocCreateClusterOperator` has been changed from 500GB to 1TB. +##### Generating Cluster Config + +If you are upgrading from Airflow 1.10.x and are not using **CLUSTER_CONFIG**, +You can easily generate config using **make()** of `airflow.providers.google.cloud.operators.dataproc.ClusterGenerator` + +This has been proved specially useful if you are using **metadata** argument from older API, refer [AIRFLOW-16911](https://github.com/apache/airflow/issues/16911) for details. + +eg. your cluster creation may look like this in **v1.10.x** + +```python +path = f"gs://goog-dataproc-initialization-actions-us-central1/python/pip-install.sh" + +create_cluster = DataprocClusterCreateOperator( + task_id="create_dataproc_cluster", + cluster_name="test", + project_id="test", + zone="us-central1-a", + region="us-central1", + master_machine_type="n1-standard-4", + worker_machine_type="n1-standard-4", + num_workers=2, + storage_bucket="test_bucket", + init_actions_uris=[path], + metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"}, +) +``` + +After upgrading to **v2.x.x** and using **CLUSTER_CONFIG**, it will look like followed: + +```python +path = f"gs://goog-dataproc-initialization-actions-us-central1/python/pip-install.sh" + +CLUSTER_CONFIG = ClusterGenerator( + project_id="test", + zone="us-central1-a", + master_machine_type="n1-standard-4", + worker_machine_type="n1-standard-4", + num_workers=2, + storage_bucket="test", + init_actions_uris=[path], + metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"}, +).make() + +create_cluster_operator = DataprocClusterCreateOperator( + task_id="create_dataproc_cluster", + cluster_name="test", + project_id="test", + region="us-central1", + cluster_config=CLUSTER_CONFIG, +) +``` + #### `airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetTablesOperator` We changed signature of `BigQueryGetDatasetTablesOperator`. diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc.py b/airflow/providers/google/cloud/example_dags/example_dataproc.py index 49594981d24a9..3ddff3133316b 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataproc.py +++ b/airflow/providers/google/cloud/example_dags/example_dataproc.py @@ -23,7 +23,9 @@ import os from airflow import models +from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator from airflow.providers.google.cloud.operators.dataproc import ( + ClusterGenerator, DataprocCreateClusterOperator, DataprocCreateWorkflowTemplateOperator, DataprocDeleteClusterOperator, @@ -64,6 +66,30 @@ # [END how_to_cloud_dataproc_create_cluster] +# Cluster definition: Generating Cluster Config for DataprocClusterCreateOperator +# [START how_to_cloud_dataproc_create_cluster_generate_cluster_config] +path = "gs://goog-dataproc-initialization-actions-us-central1/python/pip-install.sh" + +CLUSTER_CONFIG = ClusterGenerator( + project_id="test", + zone="us-central1-a", + master_machine_type="n1-standard-4", + worker_machine_type="n1-standard-4", + num_workers=2, + storage_bucket="test", + init_actions_uris=[path], + metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl'}, +).make() + +create_cluster_operator = DataprocClusterCreateOperator( + task_id='create_dataproc_cluster', + cluster_name="test", + project_id="test", + region="us-central1", + cluster_config=CLUSTER_CONFIG, +) +# [END how_to_cloud_dataproc_create_cluster_generate_cluster_config] + # Update options # [START how_to_cloud_dataproc_updatemask_cluster_operator] CLUSTER_UPDATE = { diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst index 55449e9fbcd55..3d506d0f5aeb2 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst @@ -57,6 +57,19 @@ With this configuration we can create the cluster: :start-after: [START how_to_cloud_dataproc_create_cluster_operator] :end-before: [END how_to_cloud_dataproc_create_cluster_operator] +Generating Cluster Config +^^^^^^^^^^^^^^^^^^^^^^^^^ +You can also generate **CLUSTER_CONFIG** using functional API, +this could be easily done using **make()** of +:class:`~airflow.providers.google.cloud.operators.dataproc.ClusterGenerator` +You can generate and use config as followed: + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 0 + :start-after: [START how_to_cloud_dataproc_create_cluster_generate_cluster_config] + :end-before: [END how_to_cloud_dataproc_create_cluster_generate_cluster_config] + Update a cluster ---------------- You can scale the cluster up or down by providing a cluster config and a updateMask.