Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support of Teradata Compute Cluster Provision, Decommission, Suspend and Resume operations #40509

Merged
merged 2 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
513 changes: 513 additions & 0 deletions airflow/providers/teradata/operators/teradata_compute_cluster.py

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions airflow/providers/teradata/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ integrations:
external-doc-url: https://www.teradata.com/
how-to-guide:
- /docs/apache-airflow-providers-teradata/operators/teradata.rst
- /docs/apache-airflow-providers-teradata/operators/compute_cluster.rst
logo: /integration-logos/teradata/Teradata.png
tags: [software]

operators:
- integration-name: Teradata
python-modules:
- airflow.providers.teradata.operators.teradata
- airflow.providers.teradata.operators.teradata_compute_cluster

hooks:
- integration-name: Teradata
Expand All @@ -80,3 +82,8 @@ transfers:
connection-types:
- hook-class-name: airflow.providers.teradata.hooks.teradata.TeradataHook
connection-type: teradata

triggers:
- integration-name: Teradata
python-modules:
- airflow.providers.teradata.triggers.teradata_compute_cluster
16 changes: 16 additions & 0 deletions airflow/providers/teradata/triggers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
155 changes: 155 additions & 0 deletions airflow/providers/teradata/triggers/teradata_compute_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import asyncio
from typing import Any, AsyncIterator

from airflow.exceptions import AirflowException
from airflow.providers.common.sql.hooks.sql import fetch_one_handler
from airflow.providers.teradata.hooks.teradata import TeradataHook
from airflow.providers.teradata.utils.constants import Constants
from airflow.triggers.base import BaseTrigger, TriggerEvent


class TeradataComputeClusterSyncTrigger(BaseTrigger):
"""
Fetch the status of the suspend or resume operation for the specified compute cluster.

:param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
reference to a specific Teradata database.
:param compute_profile_name: Name of the Compute Profile to manage.
:param compute_group_name: Name of compute group to which compute profile belongs.
:param opr_type: Compute cluster operation - SUSPEND/RESUME
:param poll_interval: polling period in minutes to check for the status
"""

def __init__(
self,
teradata_conn_id: str,
compute_profile_name: str,
compute_group_name: str | None = None,
operation_type: str | None = None,
poll_interval: float | None = None,
):
super().__init__()
self.teradata_conn_id = teradata_conn_id
self.compute_profile_name = compute_profile_name
self.compute_group_name = compute_group_name
self.operation_type = operation_type
self.poll_interval = poll_interval

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize TeradataComputeClusterSyncTrigger arguments and classpath."""
return (
"airflow.providers.teradata.triggers.teradata_compute_cluster.TeradataComputeClusterSyncTrigger",
{
"teradata_conn_id": self.teradata_conn_id,
"compute_profile_name": self.compute_profile_name,
"compute_group_name": self.compute_group_name,
"operation_type": self.operation_type,
"poll_interval": self.poll_interval,
},
)

async def run(self) -> AsyncIterator[TriggerEvent]:
"""Wait for Compute Cluster operation to complete."""
try:
while True:
status = await self.get_status()
if status is None or len(status) == 0:
self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG)
raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG)
if (
self.operation_type == Constants.CC_SUSPEND_OPR
or self.operation_type == Constants.CC_CREATE_SUSPEND_OPR
):
if status == Constants.CC_SUSPEND_DB_STATUS:
break
elif (
self.operation_type == Constants.CC_RESUME_OPR
or self.operation_type == Constants.CC_CREATE_OPR
):
if status == Constants.CC_RESUME_DB_STATUS:
break
if self.poll_interval is not None:
self.poll_interval = float(self.poll_interval)
else:
self.poll_interval = float(Constants.CC_POLL_INTERVAL)
await asyncio.sleep(self.poll_interval)
if (
self.operation_type == Constants.CC_SUSPEND_OPR
or self.operation_type == Constants.CC_CREATE_SUSPEND_OPR
):
if status == Constants.CC_SUSPEND_DB_STATUS:
yield TriggerEvent(
{
"status": "success",
"message": Constants.CC_OPR_SUCCESS_STATUS_MSG
% (self.compute_profile_name, self.operation_type),
}
)
else:
yield TriggerEvent(
{
"status": "error",
"message": Constants.CC_OPR_FAILURE_STATUS_MSG
% (self.compute_profile_name, self.operation_type),
}
)
elif (
self.operation_type == Constants.CC_RESUME_OPR
or self.operation_type == Constants.CC_CREATE_OPR
):
if status == Constants.CC_RESUME_DB_STATUS:
yield TriggerEvent(
{
"status": "success",
"message": Constants.CC_OPR_SUCCESS_STATUS_MSG
% (self.compute_profile_name, self.operation_type),
}
)
else:
yield TriggerEvent(
{
"status": "error",
"message": Constants.CC_OPR_FAILURE_STATUS_MSG
% (self.compute_profile_name, self.operation_type),
}
)
else:
yield TriggerEvent({"status": "error", "message": "Invalid operation"})
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
except asyncio.CancelledError:
self.log.error(Constants.CC_OPR_TIMEOUT_ERROR, self.operation_type)

async def get_status(self) -> str:
"""Return compute cluster SUSPEND/RESUME operation status."""
sql = (
"SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('"
+ self.compute_profile_name
+ "')"
)
if self.compute_group_name:
sql += " AND UPPER(ComputeGroupName) = UPPER('" + self.compute_group_name + "')"
hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
result_set = hook.run(sql, handler=fetch_one_handler)
status = ""
if isinstance(result_set, list) and isinstance(result_set[0], str):
status = str(result_set[0])
return status
16 changes: 16 additions & 0 deletions airflow/providers/teradata/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
46 changes: 46 additions & 0 deletions airflow/providers/teradata/utils/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations


class Constants:
"""Define constants for Teradata Provider."""

CC_CREATE_OPR = "CREATE"
CC_CREATE_SUSPEND_OPR = "CREATE_SUSPEND"
CC_DROP_OPR = "DROP"
CC_SUSPEND_OPR = "SUSPEND"
CC_RESUME_OPR = "RESUME"
CC_INITIALIZE_DB_STATUS = "Initializing"
CC_SUSPEND_DB_STATUS = "Suspended"
CC_RESUME_DB_STATUS = "Running"
CC_OPR_SUCCESS_STATUS_MSG = "Compute Cluster %s %s operation completed successfully."
CC_OPR_FAILURE_STATUS_MSG = "Compute Cluster %s %s operation has failed."
CC_OPR_INITIALIZING_STATUS_MSG = "The environment is currently initializing. Please wait."
CC_OPR_EMPTY_PROFILE_ERROR_MSG = "Please provide a valid name for the compute cluster profile."
CC_GRP_PRP_NON_EXISTS_MSG = "The specified Compute cluster is not present or The user doesn't have permission to access compute cluster."
CC_GRP_PRP_UN_AUTHORIZED_MSG = "The %s operation is not authorized for the user."
CC_GRP_LAKE_SUPPORT_ONLY_MSG = "Compute Groups is supported only on Vantage Cloud Lake."
CC_OPR_TIMEOUT_ERROR = (
"There is an issue with the %s operation. Kindly consult the administrator for assistance."
)
CC_GRP_PRP_EXISTS_MSG = "The specified Compute cluster is already exists."
CC_OPR_EMPTY_COPY_PROFILE_ERROR_MSG = (
"Please provide a valid name for the source and target compute profile."
)
CC_OPR_TIME_OUT = 1200
CC_POLL_INTERVAL = 60
107 changes: 107 additions & 0 deletions docs/apache-airflow-providers-teradata/operators/compute_cluster.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.


.. _howto/operator:TeradataComputeClusterProvisionOperator:


=======================================
TeradataComputeClusterProvisionOperator
=======================================

The purpose of ``TeradataComputeClusterProvisionOperator`` is to provision the new Teradata Vantage Cloud Lake
Compute Cluster with specified Compute Group Name and Compute Profile Name.
Use the :class:`TeradataComputeClusterProvisionOperator <airflow.providers.teradata.operators.teradata_compute_cluster>`
to provision the new Compute Cluster in Teradata Vantage Cloud Lake.



An example usage of the TeradataComputeClusterProvisionOperator to provision the new Compute Cluster in
Teradata Vantage Cloud Lake is as follows:

.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_compute_cluster.py
:language: python
:start-after: [START teradata_vantage_lake_compute_cluster_provision_howto_guide]
:end-before: [END teradata_vantage_lake_compute_cluster_provision_howto_guide]


.. _howto/operator:TeradataComputeClusterDecommissionOperator:


==========================================
TeradataComputeClusterDecommissionOperator
==========================================

The purpose of ``TeradataComputeClusterDecommissionOperator`` is to decommission the specified Teradata Vantage Cloud Lake
Compute Cluster.
Use the :class:`TeradataComputeClusterProvisionOperator <airflow.providers.teradata.operators.teradata_compute_cluster>`
to decommission the specified Teradata Vantage Cloud Lake Compute Cluster.



An example usage of the TeradataComputeClusterDecommissionOperator to decommission the specified Teradata Vantage Cloud
Lake Compute Cluster is as follows:

.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_compute_cluster.py
:language: python
:start-after: [START teradata_vantage_lake_compute_cluster_decommission_howto_guide]
:end-before: [END teradata_vantage_lake_compute_cluster_decommission_howto_guide]


.. _howto/operator:TeradataComputeClusterResumeOperator:


=====================================
TeradataComputeClusterResumeOperator
=====================================

The purpose of ``TeradataComputeClusterResumeOperator`` is to start the Teradata Vantage Cloud Lake
Compute Cluster of specified Compute Group Name and Compute Profile Name.
Use the :class:`TeradataComputeClusterResumeOperator <airflow.providers.teradata.operators.teradata_compute_cluster>`
to start the specified Compute Cluster in Teradata Vantage Cloud Lake.



An example usage of the TeradataComputeClusterSuspendOperator to start the specified Compute Cluster in
Teradata Vantage Cloud Lake is as follows:

.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_compute_cluster.py
:language: python
:start-after: [START teradata_vantage_lake_compute_cluster_resume_howto_guide]
:end-before: [END teradata_vantage_lake_compute_cluster_resume_howto_guide]

.. _howto/operator:TeradataComputeClusterSuspendOperator:


=====================================
TeradataComputeClusterSuspendOperator
=====================================

The purpose of ``TeradataComputeClusterSuspendOperator`` is to suspend the Teradata Vantage Cloud Lake
Compute Cluster of specified Compute Group Name and Compute Profile Name.
Use the :class:`TeradataComputeClusterSuspendOperator <airflow.providers.teradata.operators.teradata_compute_cluster>`
to suspend the specified Compute Cluster in Teradata Vantage Cloud Lake.



An example usage of the TeradataComputeClusterSuspendOperator to suspend the specified Compute Cluster in
Teradata Vantage Cloud Lake is as follows:

.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_compute_cluster.py
:language: python
:start-after: [START teradata_vantage_lake_compute_cluster_suspend_howto_guide]
:end-before: [END teradata_vantage_lake_compute_cluster_suspend_howto_guide]
Loading