Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloud SQL assets & system tests migration (AIP-47) #23583

Merged
merged 8 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions airflow/providers/google/cloud/links/cloud_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Cloud SQL links."""
from typing import TYPE_CHECKING, Optional

from airflow.models import BaseOperator
from airflow.providers.google.cloud.links.base import BaseGoogleLink

if TYPE_CHECKING:
from airflow.utils.context import Context


CLOUD_SQL_BASE_LINK = "https://console.cloud.google.com/sql"
CLOUD_SQL_INSTANCE_LINK = CLOUD_SQL_BASE_LINK + "/instances/{instance}/overview?project={project_id}"
CLOUD_SQL_INSTANCE_DATABASE_LINK = (
CLOUD_SQL_BASE_LINK + "/instances/{instance}/databases?project={project_id}"
)


class CloudSQLInstanceLink(BaseGoogleLink):
"""Helper class for constructing Cloud SQL Instance Link"""

name = "Cloud SQL Instance"
key = "cloud_sql_instance"
format_str = CLOUD_SQL_INSTANCE_LINK

@staticmethod
def persist(
context: "Context",
task_instance: BaseOperator,
cloud_sql_instance: str,
project_id: Optional[str],
):
task_instance.xcom_push(
context,
key=CloudSQLInstanceLink.key,
value={"instance": cloud_sql_instance, "project_id": project_id},
)


class CloudSQLInstanceDatabaseLink(BaseGoogleLink):
"""Helper class for constructing Cloud SQL Instance Database Link"""

name = "Cloud SQL Instance Database"
key = "cloud_sql_instance_database"
format_str = CLOUD_SQL_INSTANCE_DATABASE_LINK

@staticmethod
def persist(
context: "Context",
task_instance: BaseOperator,
cloud_sql_instance: str,
project_id: Optional[str],
):
task_instance.xcom_push(
context,
key=CloudSQLInstanceDatabaseLink.key,
value={"instance": cloud_sql_instance, "project_id": project_id},
)
58 changes: 58 additions & 0 deletions airflow/providers/google/cloud/operators/cloud_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator, Connection
from airflow.providers.google.cloud.hooks.cloud_sql import CloudSQLDatabaseHook, CloudSQLHook
from airflow.providers.google.cloud.links.cloud_sql import CloudSQLInstanceDatabaseLink, CloudSQLInstanceLink
from airflow.providers.google.cloud.utils.field_validator import GcpBodyFieldValidator
from airflow.providers.google.common.links.storage import FileDetailsLink
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook

Expand Down Expand Up @@ -316,6 +318,7 @@ class CloudSQLCreateInstanceOperator(CloudSQLBaseOperator):
'impersonation_chain',
)
# [END gcp_sql_create_template_fields]
operator_extra_links = (CloudSQLInstanceLink(),)

def __init__(
self,
Expand Down Expand Up @@ -363,6 +366,13 @@ def execute(self, context: 'Context') -> None:
else:
self.log.info("Cloud SQL instance with ID %s already exists. Aborting create.", self.instance)

CloudSQLInstanceLink.persist(
context=context,
task_instance=self,
cloud_sql_instance=self.instance,
project_id=self.project_id or hook.project_id,
)

instance_resource = hook.get_instance(project_id=self.project_id, instance=self.instance)
service_account_email = instance_resource["serviceAccountEmailAddress"]
task_instance = context['task_instance']
Expand Down Expand Up @@ -411,6 +421,7 @@ class CloudSQLInstancePatchOperator(CloudSQLBaseOperator):
'impersonation_chain',
)
# [END gcp_sql_patch_template_fields]
operator_extra_links = (CloudSQLInstanceLink(),)

def __init__(
self,
Expand Down Expand Up @@ -450,6 +461,13 @@ def execute(self, context: 'Context'):
'Please specify another instance to patch.'
)
else:
CloudSQLInstanceLink.persist(
context=context,
task_instance=self,
cloud_sql_instance=self.instance,
project_id=self.project_id or hook.project_id,
)

return hook.patch_instance(project_id=self.project_id, body=self.body, instance=self.instance)


Expand Down Expand Up @@ -535,6 +553,7 @@ class CloudSQLCreateInstanceDatabaseOperator(CloudSQLBaseOperator):
'impersonation_chain',
)
# [END gcp_sql_db_create_template_fields]
operator_extra_links = (CloudSQLInstanceDatabaseLink(),)

def __init__(
self,
Expand Down Expand Up @@ -585,6 +604,12 @@ def execute(self, context: 'Context') -> Optional[bool]:
api_version=self.api_version,
impersonation_chain=self.impersonation_chain,
)
CloudSQLInstanceDatabaseLink.persist(
context=context,
task_instance=self,
cloud_sql_instance=self.instance,
project_id=self.project_id or hook.project_id,
)
if self._check_if_db_exists(database, hook):
self.log.info(
"Cloud SQL instance with ID %s already contains database '%s'. Aborting database insert.",
Expand Down Expand Up @@ -635,6 +660,7 @@ class CloudSQLPatchInstanceDatabaseOperator(CloudSQLBaseOperator):
'impersonation_chain',
)
# [END gcp_sql_db_patch_template_fields]
operator_extra_links = (CloudSQLInstanceDatabaseLink(),)

def __init__(
self,
Expand Down Expand Up @@ -687,6 +713,12 @@ def execute(self, context: 'Context') -> None:
"Please specify another database to patch."
)
else:
CloudSQLInstanceDatabaseLink.persist(
context=context,
task_instance=self,
cloud_sql_instance=self.instance,
project_id=self.project_id or hook.project_id,
)
return hook.patch_database(
project_id=self.project_id, instance=self.instance, database=self.database, body=self.body
)
Expand Down Expand Up @@ -811,6 +843,7 @@ class CloudSQLExportInstanceOperator(CloudSQLBaseOperator):
'impersonation_chain',
)
# [END gcp_sql_export_template_fields]
operator_extra_links = (CloudSQLInstanceLink(), FileDetailsLink())

def __init__(
self,
Expand Down Expand Up @@ -853,6 +886,18 @@ def execute(self, context: 'Context') -> None:
api_version=self.api_version,
impersonation_chain=self.impersonation_chain,
)
CloudSQLInstanceLink.persist(
context=context,
task_instance=self,
cloud_sql_instance=self.instance,
project_id=self.project_id or hook.project_id,
)
FileDetailsLink.persist(
context=context,
task_instance=self,
uri=self.body["exportContext"]["uri"][5:],
project_id=self.project_id or hook.project_id,
)
return hook.export_instance(project_id=self.project_id, instance=self.instance, body=self.body)


Expand Down Expand Up @@ -908,6 +953,7 @@ class CloudSQLImportInstanceOperator(CloudSQLBaseOperator):
'impersonation_chain',
)
# [END gcp_sql_import_template_fields]
operator_extra_links = (CloudSQLInstanceLink(), FileDetailsLink())

def __init__(
self,
Expand Down Expand Up @@ -950,6 +996,18 @@ def execute(self, context: 'Context') -> None:
api_version=self.api_version,
impersonation_chain=self.impersonation_chain,
)
CloudSQLInstanceLink.persist(
context=context,
task_instance=self,
cloud_sql_instance=self.instance,
project_id=self.project_id or hook.project_id,
)
FileDetailsLink.persist(
context=context,
task_instance=self,
uri=self.body["importContext"]["uri"][5:],
project_id=self.project_id or hook.project_id,
)
return hook.import_instance(project_id=self.project_id, instance=self.instance, body=self.body)


Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,8 @@ extra-links:
- airflow.providers.google.cloud.operators.datafusion.DataFusionInstanceLink
- airflow.providers.google.cloud.operators.datafusion.DataFusionPipelineLink
- airflow.providers.google.cloud.operators.datafusion.DataFusionPipelinesLink
- airflow.providers.google.cloud.links.cloud_sql.CloudSQLInstanceLink
- airflow.providers.google.cloud.links.cloud_sql.CloudSQLInstanceDatabaseLink
- airflow.providers.google.cloud.links.dataplex.DataplexTaskLink
- airflow.providers.google.cloud.links.dataplex.DataplexTasksLink
- airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink
Expand Down
36 changes: 18 additions & 18 deletions docs/apache-airflow-providers-google/operators/cloud/cloud_sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_db_create]
:end-before: [END howto_operator_cloudsql_db_create]

Example request body:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:start-after: [START howto_operator_cloudsql_db_create_body]
:end-before: [END howto_operator_cloudsql_db_create_body]
Expand Down Expand Up @@ -87,7 +87,7 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_db_delete]
Expand Down Expand Up @@ -127,15 +127,15 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_db_patch]
:end-before: [END howto_operator_cloudsql_db_patch]

Example request body:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:start-after: [START howto_operator_cloudsql_db_patch_body]
:end-before: [END howto_operator_cloudsql_db_patch_body]
Expand Down Expand Up @@ -174,7 +174,7 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_delete]
Expand All @@ -183,7 +183,7 @@ it will be retrieved from the Google Cloud connection used. Both variants are sh
Note: If the instance has read or failover replicas you need to delete them before you delete the primary instance.
Replicas are deleted the same way as primary instances:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_replicas_delete]
Expand Down Expand Up @@ -224,7 +224,7 @@ Arguments

Example body defining the export operation:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:start-after: [START howto_operator_cloudsql_export_body]
:end-before: [END howto_operator_cloudsql_export_body]
Expand All @@ -235,7 +235,7 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_export]
Expand Down Expand Up @@ -269,7 +269,7 @@ To grant the service account with the appropriate WRITE permissions for the GCS
you can use the :class:`~airflow.providers.google.cloud.operators.gcs.GCSBucketCreateAclEntryOperator`,
as shown in the example:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_export_gcs_permissions]
Expand Down Expand Up @@ -309,7 +309,7 @@ Arguments

Example body defining the import operation:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:start-after: [START howto_operator_cloudsql_import_body]
:end-before: [END howto_operator_cloudsql_import_body]
Expand All @@ -320,7 +320,7 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_import]
Expand Down Expand Up @@ -354,7 +354,7 @@ To grant the service account with the appropriate READ permissions for the GCS o
you can use the :class:`~airflow.providers.google.cloud.operators.gcs.GCSBucketCreateAclEntryOperator`,
as shown in the example:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_import_gcs_permissions]
Expand All @@ -380,14 +380,14 @@ Arguments

Example body defining the instance with failover replica:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:start-after: [START howto_operator_cloudsql_create_body]
:end-before: [END howto_operator_cloudsql_create_body]

Example body defining read replica for the instance above:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:start-after: [START howto_operator_cloudsql_create_replica]
:end-before: [END howto_operator_cloudsql_create_replica]
Expand All @@ -401,7 +401,7 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_create]
Expand Down Expand Up @@ -441,7 +441,7 @@ Arguments

Example body defining the instance:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:start-after: [START howto_operator_cloudsql_patch_body]
:end-before: [END howto_operator_cloudsql_patch_body]
Expand All @@ -452,7 +452,7 @@ Using the operator
You can create the operator with or without project id. If project id is missing
it will be retrieved from the Google Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_patch]
Expand Down
Loading