From 6e8366527e08a8a8292d6244b84c74b8eba590b6 Mon Sep 17 00:00:00 2001 From: Beata Kossakowska Date: Fri, 9 Sep 2022 10:32:31 +0000 Subject: [PATCH 1/2] Cloud Functions Operators assets & system tests migration (AIP-47) --- .../google/cloud/links/cloud_functions.py | 77 +++++++++ .../google/cloud/operators/functions.py | 36 ++++ airflow/providers/google/provider.yaml | 2 + .../operators/cloud/functions.rst | 12 +- .../google/cloud/operators/test_functions.py | 42 +++-- .../google/cloud/cloud_functions/__init__.py} | 21 --- .../cloud_functions/example_functions.py | 156 ++++++++++++++++++ 7 files changed, 303 insertions(+), 43 deletions(-) create mode 100644 airflow/providers/google/cloud/links/cloud_functions.py rename tests/{providers/google/cloud/operators/test_functions_system.py => system/providers/google/cloud/cloud_functions/__init__.py} (55%) create mode 100644 tests/system/providers/google/cloud/cloud_functions/example_functions.py diff --git a/airflow/providers/google/cloud/links/cloud_functions.py b/airflow/providers/google/cloud/links/cloud_functions.py new file mode 100644 index 0000000000000..ec3fd023502e0 --- /dev/null +++ b/airflow/providers/google/cloud/links/cloud_functions.py @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains Google Cloud Functions links.""" +from typing import TYPE_CHECKING + +from airflow.models import BaseOperator +from airflow.providers.google.cloud.links.base import BaseGoogleLink + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +CLOUD_FUNCTIONS_BASE_LINK = "https://console.cloud.google.com/functions" + +CLOUD_FUNCTIONS_DETAILS_LINK = ( + CLOUD_FUNCTIONS_BASE_LINK + "/details/{location}/{function_name}?project={project_id}" +) + +CLOUD_FUNCTIONS_LIST_LINK = CLOUD_FUNCTIONS_BASE_LINK + "/list?project={project_id}" + + +class CloudFunctionsDetailsLink(BaseGoogleLink): + """Helper class for constructing Cloud Functions Details Link""" + + name = "Cloud Functions Details" + key = "cloud_functions_details" + format_str = CLOUD_FUNCTIONS_DETAILS_LINK + + @staticmethod + def persist( + context: "Context", + task_instance: BaseOperator, + function_name: str, + location: str, + project_id: str, + ): + + task_instance.xcom_push( + context, + key=CloudFunctionsDetailsLink.key, + value={"function_name": function_name, "location": location, "project_id": project_id}, + ) + + +class CloudFunctionsListLink(BaseGoogleLink): + """Helper class for constructing Cloud Functions Details Link""" + + name = "Cloud Functions List" + key = "cloud_functions_list" + format_str = CLOUD_FUNCTIONS_LIST_LINK + + @staticmethod + def persist( + context: "Context", + task_instance: BaseOperator, + project_id: str, + ): + task_instance.xcom_push( + context, + key=CloudFunctionsDetailsLink.key, + value={"project_id": project_id}, + ) diff --git a/airflow/providers/google/cloud/operators/functions.py b/airflow/providers/google/cloud/operators/functions.py index 3173e3701e983..5bee6185f16be 100644 --- a/airflow/providers/google/cloud/operators/functions.py +++ b/airflow/providers/google/cloud/operators/functions.py @@ -26,6 +26,10 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.providers.google.cloud.hooks.functions import CloudFunctionsHook +from airflow.providers.google.cloud.links.cloud_functions import ( + CloudFunctionsDetailsLink, + CloudFunctionsListLink, +) from airflow.providers.google.cloud.utils.field_validator import ( GcpBodyFieldValidator, GcpFieldValidationException, @@ -144,6 +148,7 @@ class CloudFunctionDeployFunctionOperator(BaseOperator): 'impersonation_chain', ) # [END gcf_function_deploy_template_fields] + operator_extra_links = (CloudFunctionsDetailsLink(),) def __init__( self, @@ -226,6 +231,15 @@ def execute(self, context: Context): self._create_new_function(hook) else: self._update_function(hook) + project_id = self.project_id or hook.project_id + if project_id: + CloudFunctionsDetailsLink.persist( + context=context, + task_instance=self, + location=self.location, + project_id=project_id, + function_name=self.body['name'].split("/")[-1], + ) GCF_SOURCE_ARCHIVE_URL = 'sourceArchiveUrl' @@ -347,6 +361,7 @@ class CloudFunctionDeleteFunctionOperator(BaseOperator): 'impersonation_chain', ) # [END gcf_function_delete_template_fields] + operator_extra_links = (CloudFunctionsListLink(),) def __init__( self, @@ -355,9 +370,11 @@ def __init__( gcp_conn_id: str = 'google_cloud_default', api_version: str = 'v1', impersonation_chain: str | Sequence[str] | None = None, + project_id: str | None = None, **kwargs, ) -> None: self.name = name + self.project_id = project_id self.gcp_conn_id = gcp_conn_id self.api_version = api_version self.impersonation_chain = impersonation_chain @@ -379,6 +396,13 @@ def execute(self, context: Context): impersonation_chain=self.impersonation_chain, ) try: + project_id = self.project_id or hook.project_id + if project_id: + CloudFunctionsListLink.persist( + context=context, + task_instance=self, + project_id=project_id, + ) return hook.delete_function(self.name) except HttpError as e: status = e.resp.status @@ -423,6 +447,7 @@ class CloudFunctionInvokeFunctionOperator(BaseOperator): 'project_id', 'impersonation_chain', ) + operator_extra_links = (CloudFunctionsDetailsLink(),) def __init__( self, @@ -460,4 +485,15 @@ def execute(self, context: Context): ) self.log.info('Function called successfully. Execution id %s', result.get('executionId')) self.xcom_push(context=context, key='execution_id', value=result.get('executionId')) + + project_id = self.project_id or hook.project_id + if project_id: + CloudFunctionsDetailsLink.persist( + context=context, + task_instance=self, + location=self.location, + project_id=project_id, + function_name=self.function_id, + ) + return result diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 626bc7e7f4318..b9f653da2876e 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -1015,6 +1015,8 @@ extra-links: - airflow.providers.google.cloud.links.cloud_build.CloudBuildTriggersListLink - airflow.providers.google.cloud.links.cloud_build.CloudBuildTriggerDetailsLink - airflow.providers.google.cloud.links.life_sciences.LifeSciencesLink + - airflow.providers.google.cloud.links.cloud_functions.CloudFunctionsDetailsLink + - airflow.providers.google.cloud.links.cloud_functions.CloudFunctionsListLink - airflow.providers.google.common.links.storage.StorageLink - airflow.providers.google.common.links.storage.FileDetailsLink diff --git a/docs/apache-airflow-providers-google/operators/cloud/functions.rst b/docs/apache-airflow-providers-google/operators/cloud/functions.rst index 9527cad235552..932f5ce84181a 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/functions.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/functions.rst @@ -38,7 +38,7 @@ For parameter definition, take a look at Using the operator """""""""""""""""" -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py :language: python :dedent: 4 :start-after: [START howto_operator_gcf_delete] @@ -77,7 +77,7 @@ Arguments When a DAG is created, the default_args dictionary can be used to pass arguments common with other tasks: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py :language: python :start-after: [START howto_operator_gcf_default_args] :end-before: [END howto_operator_gcf_default_args] @@ -101,19 +101,19 @@ Using the operator Depending on the combination of parameters, the Function's source code can be obtained from different sources: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py :language: python :start-after: [START howto_operator_gcf_deploy_body] :end-before: [END howto_operator_gcf_deploy_body] -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py :language: python :start-after: [START howto_operator_gcf_deploy_variants] :end-before: [END howto_operator_gcf_deploy_variants] The code to create the operator: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py :language: python :dedent: 4 :start-after: [START howto_operator_gcf_deploy] @@ -122,7 +122,7 @@ The code to create the operator: You can also create the operator without project id - project id will be retrieved from the Google Cloud connection used: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py :language: python :dedent: 4 :start-after: [START howto_operator_gcf_deploy_no_project_id] diff --git a/tests/providers/google/cloud/operators/test_functions.py b/tests/providers/google/cloud/operators/test_functions.py index d0ad65b1a1ff5..97b3e4e6ff02d 100644 --- a/tests/providers/google/cloud/operators/test_functions.py +++ b/tests/providers/google/cloud/operators/test_functions.py @@ -91,7 +91,7 @@ def test_deploy_execute(self, mock_hook): op = CloudFunctionDeployFunctionOperator( project_id=GCP_PROJECT_ID, location=GCP_LOCATION, body=deepcopy(VALID_BODY), task_id="id" ) - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -113,7 +113,7 @@ def test_update_function_if_exists(self, mock_hook): op = CloudFunctionDeployFunctionOperator( project_id=GCP_PROJECT_ID, location=GCP_LOCATION, body=deepcopy(VALID_BODY), task_id="id" ) - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -137,7 +137,7 @@ def test_empty_project_id_is_ok(self, mock_hook): operator = CloudFunctionDeployFunctionOperator( location="test_region", body=deepcopy(VALID_BODY), task_id="id" ) - operator.execute(None) + operator.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -176,7 +176,7 @@ def test_correct_runtime_field(self, runtime, mock_hook): op = CloudFunctionDeployFunctionOperator( project_id="test_project_id", location="test_region", body=body, task_id="id" ) - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -203,7 +203,7 @@ def test_valid_network_field(self, network, mock_hook): op = CloudFunctionDeployFunctionOperator( project_id="test_project_id", location="test_region", body=body, task_id="id" ) - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -229,7 +229,7 @@ def test_valid_labels_field(self, labels, mock_hook): op = CloudFunctionDeployFunctionOperator( project_id="test_project_id", location="test_region", body=body, task_id="id" ) - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -244,7 +244,7 @@ def test_validation_disabled(self, mock_hook): op = CloudFunctionDeployFunctionOperator( project_id="test_project_id", location="test_region", body=body, validate_body=False, task_id="id" ) - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -421,7 +421,7 @@ def test_valid_source_code_union_field(self, source_code, project_id, mock_hook) op = CloudFunctionDeployFunctionOperator( location="test_region", body=body, task_id="id", zip_path=zip_path ) - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', impersonation_chain=None, ) @@ -548,7 +548,7 @@ def test_valid_trigger_union_field(self, trigger, mock_hook): body=body, task_id="id", ) - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -570,7 +570,7 @@ def test_extra_parameter(self, mock_hook): op = CloudFunctionDeployFunctionOperator( project_id="test_project_id", location="test_region", body=body, task_id="id" ) - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -600,7 +600,7 @@ class TestGcfFunctionDelete(unittest.TestCase): def test_delete_execute(self, mock_hook): mock_hook.return_value.delete_function.return_value = self._DELETE_FUNCTION_EXPECTED op = CloudFunctionDeleteFunctionOperator(name=self._FUNCTION_NAME, task_id="id") - result = op.execute(None) + result = op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -616,7 +616,7 @@ def test_correct_name(self, mock_hook): op = CloudFunctionDeleteFunctionOperator( name="projects/project_name/locations/project_location/functions/function_name", task_id="id" ) - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -647,7 +647,7 @@ def test_gcf_error_silenced_when_function_doesnt_exist(self, mock_hook): mock_hook.return_value.delete_function.side_effect = mock.Mock( side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found') ) - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -666,7 +666,7 @@ def test_non_404_gcf_error_bubbled_up(self, mock_hook): ) with pytest.raises(HttpError): - op.execute(None) + op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', @@ -701,7 +701,9 @@ def test_execute(self, mock_gcf_hook, mock_xcom): gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, ) - op.execute(None) + context = mock.MagicMock() + op.execute(context=context) + mock_gcf_hook.assert_called_once_with( api_version=api_version, gcp_conn_id=gcp_conn_id, @@ -712,4 +714,12 @@ def test_execute(self, mock_gcf_hook, mock_xcom): function_id=function_id, input_data=payload, location=GCP_LOCATION, project_id=GCP_PROJECT_ID ) - mock_xcom.assert_called_once_with(context=None, key='execution_id', value=exec_id) + mock_xcom.assert_called_with( + context, + key="cloud_functions_details", + value={ + 'location': GCP_LOCATION, + 'function_name': function_id, + 'project_id': GCP_PROJECT_ID, + }, + ) diff --git a/tests/providers/google/cloud/operators/test_functions_system.py b/tests/system/providers/google/cloud/cloud_functions/__init__.py similarity index 55% rename from tests/providers/google/cloud/operators/test_functions_system.py rename to tests/system/providers/google/cloud/cloud_functions/__init__.py index 4768b492fd733..13a83393a9124 100644 --- a/tests/providers/google/cloud/operators/test_functions_system.py +++ b/tests/system/providers/google/cloud/cloud_functions/__init__.py @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,23 +14,3 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from __future__ import annotations - -import pytest - -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_FUNCTION_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.credential_file(GCP_FUNCTION_KEY) -class GcpFunctionExampleDagsSystemTest(GoogleSystemTest): - def setUp(self): - super().setUp() - - @provide_gcp_context(GCP_FUNCTION_KEY) - def test_run_example_dag_function(self): - self.run_dag('example_gcp_function', CLOUD_DAG_FOLDER) - - def tearDown(self): - super().tearDown() diff --git a/tests/system/providers/google/cloud/cloud_functions/example_functions.py b/tests/system/providers/google/cloud/cloud_functions/example_functions.py new file mode 100644 index 0000000000000..8257d34214a88 --- /dev/null +++ b/tests/system/providers/google/cloud/cloud_functions/example_functions.py @@ -0,0 +1,156 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that displays interactions with Google Cloud Functions. +It creates a function and then deletes it. + +This DAG relies on the following OS environment variables +https://airflow.apache.org/concepts.html#variables + +* PROJECT_ID - Google Cloud Project to use for the Cloud Function. +* LOCATION - Google Cloud Functions region where the function should be + created. +* ENTRYPOINT - Name of the executable function in the source code. +* and one of the below: + + * SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage + + * SOURCE_UPLOAD_URL - Generated upload URL for the zipped source and ZIP_PATH - Local path to + the zipped source archive + + * SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function + is defined in a supported Cloud Source Repository URL format + https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository + +""" + +import os +from datetime import datetime +from typing import Any, Dict + +from airflow import models +from airflow.models.baseoperator import chain +from airflow.providers.google.cloud.operators.functions import ( + CloudFunctionDeleteFunctionOperator, + CloudFunctionDeployFunctionOperator, + CloudFunctionInvokeFunctionOperator, +) + +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "example_gcp_function" + +# make sure there are no dashes in function name (!) +SHORT_FUNCTION_NAME = 'hello' + +LOCATION = 'europe-west1' + +FUNCTION_NAME = f'projects/{PROJECT_ID}/locations/{LOCATION}/functions/{SHORT_FUNCTION_NAME}' +SOURCE_ARCHIVE_URL = '' +SOURCE_UPLOAD_URL = '' + +repo = 'test-repo' +SOURCE_REPOSITORY = ( + f'https://source.developers.google.com/projects/{PROJECT_ID}/repos/{repo}/moveable-aliases/master' +) + +ZIP_PATH = '' +ENTRYPOINT = 'helloWorld' +RUNTIME = 'nodejs14' +VALIDATE_BODY = True + +# [START howto_operator_gcf_deploy_body] +body = {"name": FUNCTION_NAME, "entryPoint": ENTRYPOINT, "runtime": RUNTIME, "httpsTrigger": {}} +# [END howto_operator_gcf_deploy_body] + +# [START howto_operator_gcf_default_args] +default_args: Dict[str, Any] = {'retries': 3} +# [END howto_operator_gcf_default_args] + +# [START howto_operator_gcf_deploy_variants] +if SOURCE_ARCHIVE_URL: + body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL +elif SOURCE_REPOSITORY: + body['sourceRepository'] = {'url': SOURCE_REPOSITORY} +elif ZIP_PATH: + body['sourceUploadUrl'] = '' + default_args['zip_path'] = ZIP_PATH +elif SOURCE_UPLOAD_URL: + body['sourceUploadUrl'] = SOURCE_UPLOAD_URL +else: + raise Exception("Please provide one of the source_code parameters") +# [END howto_operator_gcf_deploy_variants] + + +with models.DAG( + DAG_ID, + default_args=default_args, + start_date=datetime(2021, 1, 1), + catchup=False, + tags=['example'], +) as dag: + + # [START howto_operator_gcf_deploy] + deploy_task = CloudFunctionDeployFunctionOperator( + task_id="gcf_deploy_task", + project_id=PROJECT_ID, + location=LOCATION, + body=body, + validate_body=VALIDATE_BODY, + ) + # [END howto_operator_gcf_deploy] + + # [START howto_operator_gcf_deploy_no_project_id] + deploy2_task = CloudFunctionDeployFunctionOperator( + task_id="gcf_deploy2_task", location=LOCATION, body=body, validate_body=VALIDATE_BODY + ) + # [END howto_operator_gcf_deploy_no_project_id] + + # [START howto_operator_gcf_invoke_function] + invoke_task = CloudFunctionInvokeFunctionOperator( + task_id="invoke_task", + project_id=PROJECT_ID, + location=LOCATION, + input_data={}, + function_id=SHORT_FUNCTION_NAME, + ) + # [END howto_operator_gcf_invoke_function] + + # [START howto_operator_gcf_delete] + delete_task = CloudFunctionDeleteFunctionOperator(task_id="gcf_delete_task", name=FUNCTION_NAME) + # [END howto_operator_gcf_delete] + + chain( + deploy_task, + deploy2_task, + invoke_task, + delete_task, + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From b49014bc84c1d0bb4215a298940d3639c98d0a44 Mon Sep 17 00:00:00 2001 From: Beata Kossakowska Date: Wed, 14 Sep 2022 13:08:24 +0000 Subject: [PATCH 2/2] Fix static checks. --- .../cloud/example_dags/example_functions.py | 127 ------------------ .../google/cloud/links/cloud_functions.py | 7 +- .../cloud_functions/example_functions.py | 6 +- 3 files changed, 9 insertions(+), 131 deletions(-) delete mode 100644 airflow/providers/google/cloud/example_dags/example_functions.py diff --git a/airflow/providers/google/cloud/example_dags/example_functions.py b/airflow/providers/google/cloud/example_dags/example_functions.py deleted file mode 100644 index be6f5bf689796..0000000000000 --- a/airflow/providers/google/cloud/example_dags/example_functions.py +++ /dev/null @@ -1,127 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example Airflow DAG that displays interactions with Google Cloud Functions. -It creates a function and then deletes it. - -This DAG relies on the following OS environment variables -https://airflow.apache.org/concepts.html#variables - -* GCP_PROJECT_ID - Google Cloud Project to use for the Cloud Function. -* GCP_LOCATION - Google Cloud Functions region where the function should be - created. -* GCF_ENTRYPOINT - Name of the executable function in the source code. -* and one of the below: - - * GCF_SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage - - * GCF_SOURCE_UPLOAD_URL - Generated upload URL for the zipped source and GCF_ZIP_PATH - Local path to - the zipped source archive - - * GCF_SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function - is defined in a supported Cloud Source Repository URL format - https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository - -""" -from __future__ import annotations - -import os -from datetime import datetime -from typing import Any - -from airflow import models -from airflow.providers.google.cloud.operators.functions import ( - CloudFunctionDeleteFunctionOperator, - CloudFunctionDeployFunctionOperator, - CloudFunctionInvokeFunctionOperator, -) - -GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') -GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1') -# make sure there are no dashes in function name (!) -GCF_SHORT_FUNCTION_NAME = os.environ.get('GCF_SHORT_FUNCTION_NAME', 'hello').replace("-", "_") -FUNCTION_NAME = f'projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/functions/{GCF_SHORT_FUNCTION_NAME}' -GCF_SOURCE_ARCHIVE_URL = os.environ.get('GCF_SOURCE_ARCHIVE_URL', '') -GCF_SOURCE_UPLOAD_URL = os.environ.get('GCF_SOURCE_UPLOAD_URL', '') -GCF_SOURCE_REPOSITORY = os.environ.get( - 'GCF_SOURCE_REPOSITORY', - f'https://source.developers.google.com/projects/{GCP_PROJECT_ID}/' - f'repos/hello-world/moveable-aliases/master', -) -GCF_ZIP_PATH = os.environ.get('GCF_ZIP_PATH', '') -GCF_ENTRYPOINT = os.environ.get('GCF_ENTRYPOINT', 'helloWorld') -GCF_RUNTIME = 'nodejs14' -GCP_VALIDATE_BODY = os.environ.get('GCP_VALIDATE_BODY', "True") == "True" - -# [START howto_operator_gcf_deploy_body] -body = {"name": FUNCTION_NAME, "entryPoint": GCF_ENTRYPOINT, "runtime": GCF_RUNTIME, "httpsTrigger": {}} -# [END howto_operator_gcf_deploy_body] - -# [START howto_operator_gcf_default_args] -default_args: dict[str, Any] = {'retries': 3} -# [END howto_operator_gcf_default_args] - -# [START howto_operator_gcf_deploy_variants] -if GCF_SOURCE_ARCHIVE_URL: - body['sourceArchiveUrl'] = GCF_SOURCE_ARCHIVE_URL -elif GCF_SOURCE_REPOSITORY: - body['sourceRepository'] = {'url': GCF_SOURCE_REPOSITORY} -elif GCF_ZIP_PATH: - body['sourceUploadUrl'] = '' - default_args['zip_path'] = GCF_ZIP_PATH -elif GCF_SOURCE_UPLOAD_URL: - body['sourceUploadUrl'] = GCF_SOURCE_UPLOAD_URL -else: - raise Exception("Please provide one of the source_code parameters") -# [END howto_operator_gcf_deploy_variants] - - -with models.DAG( - 'example_gcp_function', - default_args=default_args, - start_date=datetime(2021, 1, 1), - catchup=False, - tags=['example'], -) as dag: - # [START howto_operator_gcf_deploy] - deploy_task = CloudFunctionDeployFunctionOperator( - task_id="gcf_deploy_task", - project_id=GCP_PROJECT_ID, - location=GCP_LOCATION, - body=body, - validate_body=GCP_VALIDATE_BODY, - ) - # [END howto_operator_gcf_deploy] - # [START howto_operator_gcf_deploy_no_project_id] - deploy2_task = CloudFunctionDeployFunctionOperator( - task_id="gcf_deploy2_task", location=GCP_LOCATION, body=body, validate_body=GCP_VALIDATE_BODY - ) - # [END howto_operator_gcf_deploy_no_project_id] - # [START howto_operator_gcf_invoke_function] - invoke_task = CloudFunctionInvokeFunctionOperator( - task_id="invoke_task", - project_id=GCP_PROJECT_ID, - location=GCP_LOCATION, - input_data={}, - function_id=GCF_SHORT_FUNCTION_NAME, - ) - # [END howto_operator_gcf_invoke_function] - # [START howto_operator_gcf_delete] - delete_task = CloudFunctionDeleteFunctionOperator(task_id="gcf_delete_task", name=FUNCTION_NAME) - # [END howto_operator_gcf_delete] - deploy_task >> deploy2_task >> invoke_task >> delete_task diff --git a/airflow/providers/google/cloud/links/cloud_functions.py b/airflow/providers/google/cloud/links/cloud_functions.py index ec3fd023502e0..1cb8349607c93 100644 --- a/airflow/providers/google/cloud/links/cloud_functions.py +++ b/airflow/providers/google/cloud/links/cloud_functions.py @@ -16,6 +16,9 @@ # specific language governing permissions and limitations # under the License. """This module contains Google Cloud Functions links.""" + +from __future__ import annotations + from typing import TYPE_CHECKING from airflow.models import BaseOperator @@ -43,7 +46,7 @@ class CloudFunctionsDetailsLink(BaseGoogleLink): @staticmethod def persist( - context: "Context", + context: Context, task_instance: BaseOperator, function_name: str, location: str, @@ -66,7 +69,7 @@ class CloudFunctionsListLink(BaseGoogleLink): @staticmethod def persist( - context: "Context", + context: Context, task_instance: BaseOperator, project_id: str, ): diff --git a/tests/system/providers/google/cloud/cloud_functions/example_functions.py b/tests/system/providers/google/cloud/cloud_functions/example_functions.py index 8257d34214a88..9a33f2857e8ea 100644 --- a/tests/system/providers/google/cloud/cloud_functions/example_functions.py +++ b/tests/system/providers/google/cloud/cloud_functions/example_functions.py @@ -40,9 +40,11 @@ """ +from __future__ import annotations + import os from datetime import datetime -from typing import Any, Dict +from typing import Any from airflow import models from airflow.models.baseoperator import chain @@ -80,7 +82,7 @@ # [END howto_operator_gcf_deploy_body] # [START howto_operator_gcf_default_args] -default_args: Dict[str, Any] = {'retries': 3} +default_args: dict[str, Any] = {'retries': 3} # [END howto_operator_gcf_default_args] # [START howto_operator_gcf_deploy_variants]