Skip to content

Commit

Permalink
Cloud Functions Operators assets & system tests migration (AIP-47) (#…
Browse files Browse the repository at this point in the history
…26073)

* Cloud Functions Operators assets & system tests migration (AIP-47)
  • Loading branch information
bkossakowska authored Sep 19, 2022
1 parent cc4f245 commit 6045f7a
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 81 deletions.
80 changes: 80 additions & 0 deletions airflow/providers/google/cloud/links/cloud_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Cloud Functions links."""

from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.google.cloud.links.base import BaseGoogleLink

if TYPE_CHECKING:
from airflow.utils.context import Context


CLOUD_FUNCTIONS_BASE_LINK = "https://console.cloud.google.com/functions"

CLOUD_FUNCTIONS_DETAILS_LINK = (
CLOUD_FUNCTIONS_BASE_LINK + "/details/{location}/{function_name}?project={project_id}"
)

CLOUD_FUNCTIONS_LIST_LINK = CLOUD_FUNCTIONS_BASE_LINK + "/list?project={project_id}"


class CloudFunctionsDetailsLink(BaseGoogleLink):
"""Helper class for constructing Cloud Functions Details Link"""

name = "Cloud Functions Details"
key = "cloud_functions_details"
format_str = CLOUD_FUNCTIONS_DETAILS_LINK

@staticmethod
def persist(
context: Context,
task_instance: BaseOperator,
function_name: str,
location: str,
project_id: str,
):

task_instance.xcom_push(
context,
key=CloudFunctionsDetailsLink.key,
value={"function_name": function_name, "location": location, "project_id": project_id},
)


class CloudFunctionsListLink(BaseGoogleLink):
"""Helper class for constructing Cloud Functions Details Link"""

name = "Cloud Functions List"
key = "cloud_functions_list"
format_str = CLOUD_FUNCTIONS_LIST_LINK

@staticmethod
def persist(
context: Context,
task_instance: BaseOperator,
project_id: str,
):
task_instance.xcom_push(
context,
key=CloudFunctionsDetailsLink.key,
value={"project_id": project_id},
)
36 changes: 36 additions & 0 deletions airflow/providers/google/cloud/operators/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.functions import CloudFunctionsHook
from airflow.providers.google.cloud.links.cloud_functions import (
CloudFunctionsDetailsLink,
CloudFunctionsListLink,
)
from airflow.providers.google.cloud.utils.field_validator import (
GcpBodyFieldValidator,
GcpFieldValidationException,
Expand Down Expand Up @@ -144,6 +148,7 @@ class CloudFunctionDeployFunctionOperator(BaseOperator):
'impersonation_chain',
)
# [END gcf_function_deploy_template_fields]
operator_extra_links = (CloudFunctionsDetailsLink(),)

def __init__(
self,
Expand Down Expand Up @@ -226,6 +231,15 @@ def execute(self, context: Context):
self._create_new_function(hook)
else:
self._update_function(hook)
project_id = self.project_id or hook.project_id
if project_id:
CloudFunctionsDetailsLink.persist(
context=context,
task_instance=self,
location=self.location,
project_id=project_id,
function_name=self.body['name'].split("/")[-1],
)


GCF_SOURCE_ARCHIVE_URL = 'sourceArchiveUrl'
Expand Down Expand Up @@ -347,6 +361,7 @@ class CloudFunctionDeleteFunctionOperator(BaseOperator):
'impersonation_chain',
)
# [END gcf_function_delete_template_fields]
operator_extra_links = (CloudFunctionsListLink(),)

def __init__(
self,
Expand All @@ -355,9 +370,11 @@ def __init__(
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
impersonation_chain: str | Sequence[str] | None = None,
project_id: str | None = None,
**kwargs,
) -> None:
self.name = name
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.api_version = api_version
self.impersonation_chain = impersonation_chain
Expand All @@ -379,6 +396,13 @@ def execute(self, context: Context):
impersonation_chain=self.impersonation_chain,
)
try:
project_id = self.project_id or hook.project_id
if project_id:
CloudFunctionsListLink.persist(
context=context,
task_instance=self,
project_id=project_id,
)
return hook.delete_function(self.name)
except HttpError as e:
status = e.resp.status
Expand Down Expand Up @@ -423,6 +447,7 @@ class CloudFunctionInvokeFunctionOperator(BaseOperator):
'project_id',
'impersonation_chain',
)
operator_extra_links = (CloudFunctionsDetailsLink(),)

def __init__(
self,
Expand Down Expand Up @@ -460,4 +485,15 @@ def execute(self, context: Context):
)
self.log.info('Function called successfully. Execution id %s', result.get('executionId'))
self.xcom_push(context=context, key='execution_id', value=result.get('executionId'))

project_id = self.project_id or hook.project_id
if project_id:
CloudFunctionsDetailsLink.persist(
context=context,
task_instance=self,
location=self.location,
project_id=project_id,
function_name=self.function_id,
)

return result
2 changes: 2 additions & 0 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,8 @@ extra-links:
- airflow.providers.google.cloud.links.cloud_build.CloudBuildTriggersListLink
- airflow.providers.google.cloud.links.cloud_build.CloudBuildTriggerDetailsLink
- airflow.providers.google.cloud.links.life_sciences.LifeSciencesLink
- airflow.providers.google.cloud.links.cloud_functions.CloudFunctionsDetailsLink
- airflow.providers.google.cloud.links.cloud_functions.CloudFunctionsListLink
- airflow.providers.google.common.links.storage.StorageLink
- airflow.providers.google.common.links.storage.FileDetailsLink

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ For parameter definition, take a look at
Using the operator
""""""""""""""""""

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcf_delete]
Expand Down Expand Up @@ -77,7 +77,7 @@ Arguments
When a DAG is created, the default_args dictionary can be used to pass
arguments common with other tasks:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py
:language: python
:start-after: [START howto_operator_gcf_default_args]
:end-before: [END howto_operator_gcf_default_args]
Expand All @@ -101,19 +101,19 @@ Using the operator
Depending on the combination of parameters, the Function's source code can be obtained
from different sources:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py
:language: python
:start-after: [START howto_operator_gcf_deploy_body]
:end-before: [END howto_operator_gcf_deploy_body]

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py
:language: python
:start-after: [START howto_operator_gcf_deploy_variants]
:end-before: [END howto_operator_gcf_deploy_variants]

The code to create the operator:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcf_deploy]
Expand All @@ -122,7 +122,7 @@ The code to create the operator:
You can also create the operator without project id - project id will be retrieved
from the Google Cloud connection used:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_functions.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_functions/example_functions.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcf_deploy_no_project_id]
Expand Down
42 changes: 26 additions & 16 deletions tests/providers/google/cloud/operators/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def test_deploy_execute(self, mock_hook):
op = CloudFunctionDeployFunctionOperator(
project_id=GCP_PROJECT_ID, location=GCP_LOCATION, body=deepcopy(VALID_BODY), task_id="id"
)
op.execute(None)
op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand All @@ -113,7 +113,7 @@ def test_update_function_if_exists(self, mock_hook):
op = CloudFunctionDeployFunctionOperator(
project_id=GCP_PROJECT_ID, location=GCP_LOCATION, body=deepcopy(VALID_BODY), task_id="id"
)
op.execute(None)
op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand All @@ -137,7 +137,7 @@ def test_empty_project_id_is_ok(self, mock_hook):
operator = CloudFunctionDeployFunctionOperator(
location="test_region", body=deepcopy(VALID_BODY), task_id="id"
)
operator.execute(None)
operator.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_correct_runtime_field(self, runtime, mock_hook):
op = CloudFunctionDeployFunctionOperator(
project_id="test_project_id", location="test_region", body=body, task_id="id"
)
op.execute(None)
op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand All @@ -203,7 +203,7 @@ def test_valid_network_field(self, network, mock_hook):
op = CloudFunctionDeployFunctionOperator(
project_id="test_project_id", location="test_region", body=body, task_id="id"
)
op.execute(None)
op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand All @@ -229,7 +229,7 @@ def test_valid_labels_field(self, labels, mock_hook):
op = CloudFunctionDeployFunctionOperator(
project_id="test_project_id", location="test_region", body=body, task_id="id"
)
op.execute(None)
op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand All @@ -244,7 +244,7 @@ def test_validation_disabled(self, mock_hook):
op = CloudFunctionDeployFunctionOperator(
project_id="test_project_id", location="test_region", body=body, validate_body=False, task_id="id"
)
op.execute(None)
op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand Down Expand Up @@ -421,7 +421,7 @@ def test_valid_source_code_union_field(self, source_code, project_id, mock_hook)
op = CloudFunctionDeployFunctionOperator(
location="test_region", body=body, task_id="id", zip_path=zip_path
)
op.execute(None)
op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1', gcp_conn_id='google_cloud_default', impersonation_chain=None,
)
Expand Down Expand Up @@ -548,7 +548,7 @@ def test_valid_trigger_union_field(self, trigger, mock_hook):
body=body,
task_id="id",
)
op.execute(None)
op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand All @@ -570,7 +570,7 @@ def test_extra_parameter(self, mock_hook):
op = CloudFunctionDeployFunctionOperator(
project_id="test_project_id", location="test_region", body=body, task_id="id"
)
op.execute(None)
op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand Down Expand Up @@ -600,7 +600,7 @@ class TestGcfFunctionDelete(unittest.TestCase):
def test_delete_execute(self, mock_hook):
mock_hook.return_value.delete_function.return_value = self._DELETE_FUNCTION_EXPECTED
op = CloudFunctionDeleteFunctionOperator(name=self._FUNCTION_NAME, task_id="id")
result = op.execute(None)
result = op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand All @@ -616,7 +616,7 @@ def test_correct_name(self, mock_hook):
op = CloudFunctionDeleteFunctionOperator(
name="projects/project_name/locations/project_location/functions/function_name", task_id="id"
)
op.execute(None)
op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand Down Expand Up @@ -647,7 +647,7 @@ def test_gcf_error_silenced_when_function_doesnt_exist(self, mock_hook):
mock_hook.return_value.delete_function.side_effect = mock.Mock(
side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found')
)
op.execute(None)
op.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(
api_version='v1',
gcp_conn_id='google_cloud_default',
Expand All @@ -666,7 +666,7 @@ def test_non_404_gcf_error_bubbled_up(self, mock_hook):
)

with pytest.raises(HttpError):
op.execute(None)
op.execute(context=mock.MagicMock())

mock_hook.assert_called_once_with(
api_version='v1',
Expand Down Expand Up @@ -701,7 +701,9 @@ def test_execute(self, mock_gcf_hook, mock_xcom):
gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain,
)
op.execute(None)
context = mock.MagicMock()
op.execute(context=context)

mock_gcf_hook.assert_called_once_with(
api_version=api_version,
gcp_conn_id=gcp_conn_id,
Expand All @@ -712,4 +714,12 @@ def test_execute(self, mock_gcf_hook, mock_xcom):
function_id=function_id, input_data=payload, location=GCP_LOCATION, project_id=GCP_PROJECT_ID
)

mock_xcom.assert_called_once_with(context=None, key='execution_id', value=exec_id)
mock_xcom.assert_called_with(
context,
key="cloud_functions_details",
value={
'location': GCP_LOCATION,
'function_name': function_id,
'project_id': GCP_PROJECT_ID,
},
)
Loading

0 comments on commit 6045f7a

Please sign in to comment.