Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion providers/teradata/docs/operators/bteq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ BteqOperator

The :class:`~airflow.providers.teradata.operators.bteq.BteqOperator` enables execution of SQL statements or BTEQ (Basic Teradata Query) scripts using the Teradata BTEQ utility, which can be installed either locally or accessed remotely via SSH.

This is useful for executing administrative operations, batch queries, or ETL tasks in Teradata environments using the Teradata BTEQ utility.
This is useful for executing administrative operations, batch queries, or ELT tasks in Teradata environments using the Teradata BTEQ utility.

.. note::

Expand All @@ -47,6 +47,15 @@ Make sure your Teradata Airflow connection is defined with the required fields:

You can define a remote host with a separate SSH connection using the ``ssh_conn_id``.


Ensure that the Teradata BTEQ utility is installed on the machine where the SQL statements or scripts will be executed. This could be:

- The **local machine** where Airflow runs the task, for local execution.
- The **remote host** accessed via SSH, for remote execution.

If executing remotely, also ensure that an SSH server (e.g., ``sshd``) is running and accessible on the remote machine.


.. note::

For improved security, it is **highly recommended** to use
Expand All @@ -59,6 +68,7 @@ You can define a remote host with a separate SSH connection using the ``ssh_conn
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/ssh.html



To execute arbitrary SQL or BTEQ commands in a Teradata database, use the
:class:`~airflow.providers.teradata.operators.bteq.BteqOperator`.

Expand Down
30 changes: 14 additions & 16 deletions providers/teradata/src/airflow/providers/teradata/hooks/bteq.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
verify_bteq_installed,
verify_bteq_installed_remote,
)
from airflow.providers.teradata.utils.constants import Constants
from airflow.providers.teradata.utils.encryption_utils import (
decrypt_remote_file_to_string,
generate_encrypted_file_with_openssl,
Expand Down Expand Up @@ -158,7 +159,7 @@ def _transfer_to_and_execute_bteq_on_remote(
if self.ssh_hook and self.ssh_hook.get_conn():
with self.ssh_hook.get_conn() as ssh_client:
if ssh_client is None:
raise AirflowException("Failed to establish SSH connection. `ssh_client` is None.")
raise AirflowException(Constants.BTEQ_REMOTE_ERROR_MSG)
verify_bteq_installed_remote(ssh_client)
password = generate_random_password() # Encryption/Decryption password
encrypted_file_path = os.path.join(tmp_dir, "bteq_script.enc")
Expand All @@ -170,7 +171,6 @@ def _transfer_to_and_execute_bteq_on_remote(
)
remote_encrypted_path = os.path.join(remote_working_dir or "", "bteq_script.enc")
remote_encrypted_path = remote_encrypted_path.replace("/", "\\")

transfer_file_sftp(ssh_client, encrypted_file_path, remote_encrypted_path)

bteq_command_str = prepare_bteq_command_for_remote_execution(
Expand Down Expand Up @@ -204,24 +204,20 @@ def _transfer_to_and_execute_bteq_on_remote(
else [bteq_quit_rc if bteq_quit_rc is not None else 0]
)
):
raise AirflowException(f"BTEQ task failed with error: {failure_message}")
raise AirflowException(f"Failed to execute BTEQ script : {failure_message}")
if failure_message:
self.log.warning(failure_message)
return exit_status
else:
raise AirflowException("SSH connection is not established. `ssh_hook` is None or invalid.")
raise AirflowException(Constants.BTEQ_REMOTE_ERROR_MSG)
except (OSError, socket.gaierror):
raise AirflowException(
"SSH connection timed out. Please check the network or server availability."
)
raise AirflowException(Constants.BTEQ_REMOTE_ERROR_MSG)
except SSHException as e:
raise AirflowException(f"An unexpected error occurred during SSH connection: {str(e)}")
raise AirflowException(f"{Constants.BTEQ_REMOTE_ERROR_MSG}: {str(e)}")
except AirflowException as e:
raise e
except Exception as e:
raise AirflowException(
f"An unexpected error occurred while executing BTEQ script on remote machine: {str(e)}"
)
raise AirflowException(f"{Constants.BTEQ_REMOTE_ERROR_MSG}: {str(e)}")
finally:
# Remove the local script file
if encrypted_file_path and os.path.exists(encrypted_file_path):
Expand Down Expand Up @@ -276,12 +272,12 @@ def execute_bteq_script_at_local(
process.wait(timeout=timeout + 60) # Adding 1 minute extra for BTEQ script timeout
except subprocess.TimeoutExpired:
self.on_kill()
raise AirflowException(f"BTEQ command timed out after {timeout} seconds.")
raise AirflowException(Constants.BTEQ_TIMEOUT_ERROR_MSG, timeout)
conn = self.get_conn()
conn["sp"] = process # For `on_kill` support
failure_message = None
if stdout_data is None:
raise AirflowException("Process stdout is None. Unable to read BTEQ output.")
raise AirflowException(Constants.BTEQ_UNEXPECTED_ERROR_MSG)
decoded_line = ""
for line in stdout_data.splitlines():
try:
Expand All @@ -302,7 +298,7 @@ def execute_bteq_script_at_local(
else [bteq_quit_rc if bteq_quit_rc is not None else 0]
)
):
raise AirflowException(f"BTEQ task failed with error: {failure_message}")
raise AirflowException(f"{Constants.BTEQ_UNEXPECTED_ERROR_MSG}: {failure_message}")
if failure_message:
self.log.warning(failure_message)

Expand All @@ -320,7 +316,7 @@ def on_kill(self):
self.log.warning("Subprocess did not terminate in time. Forcing kill...")
process.kill()
except Exception as e:
self.log.error("Failed to terminate subprocess: %s", str(e))
self.log.error("%s : %s", Constants.BTEQ_UNEXPECTED_ERROR_MSG, str(e))

def get_airflow_home_dir(self) -> str:
"""Get the AIRFLOW_HOME directory."""
Expand All @@ -331,7 +327,9 @@ def preferred_temp_directory(self, prefix="bteq_"):
try:
temp_dir = tempfile.gettempdir()
if not os.path.isdir(temp_dir) or not os.access(temp_dir, os.W_OK):
raise OSError("OS temp dir not usable")
raise OSError(
f"Failed to execute the BTEQ script due to Temporary directory {temp_dir} is not writable."
)
except Exception:
temp_dir = self.get_airflow_home_dir()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
prepare_bteq_script_for_remote_execution,
read_file,
)
from airflow.providers.teradata.utils.constants import Constants

if TYPE_CHECKING:
from paramiko import SSHClient
Expand All @@ -35,7 +36,6 @@
from airflow.sdk.definitions.context import Context
except ImportError:
from airflow.utils.context import Context

from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.teradata.hooks.bteq import BteqHook
from airflow.providers.teradata.hooks.teradata import TeradataHook
Expand Down Expand Up @@ -114,9 +114,7 @@ def __init__(
def execute(self, context: Context) -> int | None:
"""Execute BTEQ code using the BteqHook."""
if not self.sql and not self.file_path:
raise ValueError(
"BteqOperator requires either the 'sql' or 'file_path' parameter. Both are missing."
)
raise ValueError(Constants.BTEQ_MISSED_PARAMS)
self._hook = BteqHook(teradata_conn_id=self.teradata_conn_id, ssh_conn_id=self.ssh_conn_id)
self._ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id) if self.ssh_conn_id else None

Expand Down Expand Up @@ -159,15 +157,13 @@ def execute(self, context: Context) -> int | None:
)
if self.file_path:
if not is_valid_file(self.file_path):
raise ValueError(
f"The provided file path '{self.file_path}' is invalid or does not exist."
)
raise ValueError(Constants.BTEQ_INVALID_PATH % self.file_path)
try:
is_valid_encoding(self.file_path, self.temp_file_read_encoding or "UTF-8")
except UnicodeDecodeError as e:
errmsg = f"The provided file '{self.file_path}' encoding is different from BTEQ I/O encoding i.e.'UTF-8'."
errmsg = Constants.BTEQ_INVALID_CHARSET % (self.file_path, "UTF-8")
if self.bteq_script_encoding:
errmsg = f"The provided file '{self.file_path}' encoding is different from the specified BTEQ I/O encoding '{self.bteq_script_encoding}'."
errmsg = Constants.BTEQ_INVALID_CHARSET % (self.file_path, self.bteq_script_encoding)
raise ValueError(errmsg) from e
return self._handle_local_bteq_file(
file_path=self.file_path,
Expand Down Expand Up @@ -200,13 +196,9 @@ def execute(self, context: Context) -> int | None:
file_path=self.file_path,
context=context,
)
raise ValueError(
f"The provided remote file path '{self.file_path}' is invalid or file does not exist on remote machine at given path."
)
raise ValueError(Constants.BTEQ_REMOTE_FILE_PATH_INVALID % self.file_path)
else:
raise ValueError(
"BteqOperator requires either the 'sql' or 'file_path' parameter. Both are missing."
)
raise ValueError(Constants.BTEQ_MISSED_PARAMS)
return None

def _handle_remote_bteq_file(
Expand Down Expand Up @@ -242,9 +234,7 @@ def _handle_remote_bteq_file(
self.temp_file_read_encoding,
)
return None
raise ValueError(
"Please provide a valid file path for the BTEQ script to be executed on the remote machine."
)
raise ValueError(Constants.BTEQ_MISSED_PARAMS)

def _handle_local_bteq_file(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,40 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
"""
self._compute_cluster_execute_complete(event)

def _compute_cluster_execute(self):
def _compute_cluster_execute(self, operation: str | None = None):
# Verifies the provided compute profile name.
if (
self.compute_profile_name is None
or self.compute_profile_name == "None"
or self.compute_profile_name == ""
):
self.log.info("Invalid compute cluster profile name")
raise AirflowException(Constants.CC_OPR_EMPTY_PROFILE_ERROR_MSG)
# Verifies if the provided Teradata instance belongs to Vantage Cloud Lake.
lake_support_find_sql = "SELECT count(1) from DBC.StorageV WHERE StorageName='TD_OFSSTORAGE'"
lake_support_result = self.hook.run(lake_support_find_sql, handler=_single_result_row_handler)
if lake_support_result is None:
raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG)
raise AirflowException(Constants.CC_OPR_EMPTY_PROFILE_ERROR_MSG % operation)
try:
# Verifies if the provided Teradata instance belongs to Vantage Cloud Lake.
lake_support_find_sql = "SELECT count(1) from DBC.StorageV WHERE StorageName='TD_OFSSTORAGE'"
lake_support_result = self.hook.run(lake_support_find_sql, handler=_single_result_row_handler)
if lake_support_result is None:
raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG % operation)
except Exception:
raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG % operation)
# Getting teradata db version. Considering teradata instance is Lake when db version is 20 or above
db_version_get_sql = "SELECT InfoData AS Version FROM DBC.DBCInfoV WHERE InfoKey = 'VERSION'"
try:
db_version_result = self.hook.run(db_version_get_sql, handler=_single_result_row_handler)
if db_version_result is not None:
db_version_result = str(db_version_result)
db_version = db_version_result.split(".")[0]
# Safely extract the actual version string from the result
if isinstance(db_version_result, (list, tuple)) and db_version_result:
# e.g., if it's a tuple like ('17.10',), get the first element
version_str = str(db_version_result[0])
else:
version_str = str(db_version_result) # fallback, should be rare
db_version = version_str.split(".")[0]
if db_version is not None and int(db_version) < 20:
raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG)
raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG % operation)
else:
raise AirflowException("Error occurred while getting teradata database version")
except Exception as ex:
self.log.error("Error occurred while getting teradata database version: %s ", str(ex))
raise AirflowException("Error occurred while getting teradata database version")
raise AirflowException(Constants.CC_ERR_VERSION_GET)
except Exception:
raise AirflowException(Constants.CC_ERR_VERSION_GET)

def _compute_cluster_execute_complete(self, event: dict[str, Any]) -> None:
if event["status"] == "success":
Expand Down Expand Up @@ -270,8 +276,8 @@ def execute(self, context: Context):
"""
return self._compute_cluster_execute()

def _compute_cluster_execute(self):
super()._compute_cluster_execute()
def _compute_cluster_execute(self, operation: str | None = None):
super()._compute_cluster_execute("Provision")
if self.compute_group_name:
cg_status_query = (
"SELECT count(1) FROM DBC.ComputeGroups WHERE UPPER(ComputeGroupName) = UPPER('"
Expand Down Expand Up @@ -300,7 +306,8 @@ def _compute_cluster_execute(self):
cp_status_result = self._hook_run(cp_status_query, handler=_single_result_row_handler)
if cp_status_result is not None:
cp_status_result = str(cp_status_result)
msg = f"Compute Profile {self.compute_profile_name} is already exists under Compute Group {self.compute_group_name}. Status is {cp_status_result}"
msg = f"Compute Profile '{self.compute_profile_name}' already exists under Compute Group '{self.compute_group_name}'. Status: {cp_status_result}."

self.log.info(msg)
return cp_status_result
create_cp_query = self._build_ccp_setup_query()
Expand Down Expand Up @@ -357,21 +364,22 @@ def execute(self, context: Context):
"""
return self._compute_cluster_execute()

def _compute_cluster_execute(self):
super()._compute_cluster_execute()
def _compute_cluster_execute(self, operation: str | None = None):
super()._compute_cluster_execute("Decommission")
cp_drop_query = "DROP COMPUTE PROFILE " + self.compute_profile_name
if self.compute_group_name:
cp_drop_query = cp_drop_query + " IN COMPUTE GROUP " + self.compute_group_name
self._hook_run(cp_drop_query, handler=_single_result_row_handler)
self.log.info(
"Compute Profile %s IN Compute Group %s is successfully dropped",
"Compute Profile %s in Compute Group %s is successfully dropped.",
self.compute_profile_name,
self.compute_group_name,
)
if self.delete_compute_group:

if self.delete_compute_group and self.compute_group_name:
cg_drop_query = "DROP COMPUTE GROUP " + self.compute_group_name
self._hook_run(cg_drop_query, handler=_single_result_row_handler)
self.log.info("Compute Group %s is successfully dropped", self.compute_group_name)
self.log.info("Compute Group %s is successfully dropped.", self.compute_group_name)


class TeradataComputeClusterResumeOperator(_TeradataComputeClusterOperator):
Expand Down Expand Up @@ -417,8 +425,8 @@ def execute(self, context: Context):
"""
return self._compute_cluster_execute()

def _compute_cluster_execute(self):
super()._compute_cluster_execute()
def _compute_cluster_execute(self, operation: str | None = None):
super()._compute_cluster_execute("Resume")
cc_status_query = (
"SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('"
+ self.compute_profile_name
Expand All @@ -432,15 +440,16 @@ def _compute_cluster_execute(self):
# Generates an error message if the compute cluster does not exist for the specified
# compute profile and compute group.
else:
self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG)
raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG)
raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG % operation)
if cp_status_result != Constants.CC_RESUME_DB_STATUS:
cp_resume_query = f"RESUME COMPUTE FOR COMPUTE PROFILE {self.compute_profile_name}"
if self.compute_group_name:
cp_resume_query = f"{cp_resume_query} IN COMPUTE GROUP {self.compute_group_name}"
return self._handle_cc_status(Constants.CC_RESUME_OPR, cp_resume_query)
self.log.info(
"Compute Cluster %s already %s", self.compute_profile_name, Constants.CC_RESUME_DB_STATUS
"Compute Cluster %s is already in '%s' status.",
self.compute_profile_name,
Constants.CC_RESUME_DB_STATUS,
)


Expand Down Expand Up @@ -487,8 +496,8 @@ def execute(self, context: Context):
"""
return self._compute_cluster_execute()

def _compute_cluster_execute(self):
super()._compute_cluster_execute()
def _compute_cluster_execute(self, operation: str | None = None):
super()._compute_cluster_execute("Suspend")
sql = (
"SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('"
+ self.compute_profile_name
Expand All @@ -502,13 +511,14 @@ def _compute_cluster_execute(self):
# Generates an error message if the compute cluster does not exist for the specified
# compute profile and compute group.
else:
self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG)
raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG)
raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG % operation)
if result != Constants.CC_SUSPEND_DB_STATUS:
sql = f"SUSPEND COMPUTE FOR COMPUTE PROFILE {self.compute_profile_name}"
if self.compute_group_name:
sql = f"{sql} IN COMPUTE GROUP {self.compute_group_name}"
return self._handle_cc_status(Constants.CC_SUSPEND_OPR, sql)
self.log.info(
"Compute Cluster %s already %s", self.compute_profile_name, Constants.CC_SUSPEND_DB_STATUS
"Compute Cluster %s is already in '%s' status.",
self.compute_profile_name,
Constants.CC_SUSPEND_DB_STATUS,
)
Loading
Loading