diff --git a/providers/teradata/docs/operators/bteq.rst b/providers/teradata/docs/operators/bteq.rst index b3e61bbf5a6a6..5a32d242a7538 100644 --- a/providers/teradata/docs/operators/bteq.rst +++ b/providers/teradata/docs/operators/bteq.rst @@ -22,7 +22,7 @@ BteqOperator The :class:`~airflow.providers.teradata.operators.bteq.BteqOperator` enables execution of SQL statements or BTEQ (Basic Teradata Query) scripts using the Teradata BTEQ utility, which can be installed either locally or accessed remotely via SSH. -This is useful for executing administrative operations, batch queries, or ETL tasks in Teradata environments using the Teradata BTEQ utility. +This is useful for executing administrative operations, batch queries, or ELT tasks in Teradata environments using the Teradata BTEQ utility. .. note:: @@ -47,6 +47,15 @@ Make sure your Teradata Airflow connection is defined with the required fields: You can define a remote host with a separate SSH connection using the ``ssh_conn_id``. + +Ensure that the Teradata BTEQ utility is installed on the machine where the SQL statements or scripts will be executed. This could be: + +- The **local machine** where Airflow runs the task, for local execution. +- The **remote host** accessed via SSH, for remote execution. + +If executing remotely, also ensure that an SSH server (e.g., ``sshd``) is running and accessible on the remote machine. + + .. note:: For improved security, it is **highly recommended** to use @@ -59,6 +68,7 @@ You can define a remote host with a separate SSH connection using the ``ssh_conn https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/ssh.html + To execute arbitrary SQL or BTEQ commands in a Teradata database, use the :class:`~airflow.providers.teradata.operators.bteq.BteqOperator`. diff --git a/providers/teradata/src/airflow/providers/teradata/hooks/bteq.py b/providers/teradata/src/airflow/providers/teradata/hooks/bteq.py index dae400a638825..12d108c263d55 100644 --- a/providers/teradata/src/airflow/providers/teradata/hooks/bteq.py +++ b/providers/teradata/src/airflow/providers/teradata/hooks/bteq.py @@ -37,6 +37,7 @@ verify_bteq_installed, verify_bteq_installed_remote, ) +from airflow.providers.teradata.utils.constants import Constants from airflow.providers.teradata.utils.encryption_utils import ( decrypt_remote_file_to_string, generate_encrypted_file_with_openssl, @@ -158,7 +159,7 @@ def _transfer_to_and_execute_bteq_on_remote( if self.ssh_hook and self.ssh_hook.get_conn(): with self.ssh_hook.get_conn() as ssh_client: if ssh_client is None: - raise AirflowException("Failed to establish SSH connection. `ssh_client` is None.") + raise AirflowException(Constants.BTEQ_REMOTE_ERROR_MSG) verify_bteq_installed_remote(ssh_client) password = generate_random_password() # Encryption/Decryption password encrypted_file_path = os.path.join(tmp_dir, "bteq_script.enc") @@ -170,7 +171,6 @@ def _transfer_to_and_execute_bteq_on_remote( ) remote_encrypted_path = os.path.join(remote_working_dir or "", "bteq_script.enc") remote_encrypted_path = remote_encrypted_path.replace("/", "\\") - transfer_file_sftp(ssh_client, encrypted_file_path, remote_encrypted_path) bteq_command_str = prepare_bteq_command_for_remote_execution( @@ -204,24 +204,20 @@ def _transfer_to_and_execute_bteq_on_remote( else [bteq_quit_rc if bteq_quit_rc is not None else 0] ) ): - raise AirflowException(f"BTEQ task failed with error: {failure_message}") + raise AirflowException(f"Failed to execute BTEQ script : {failure_message}") if failure_message: self.log.warning(failure_message) return exit_status else: - raise AirflowException("SSH connection is not established. `ssh_hook` is None or invalid.") + raise AirflowException(Constants.BTEQ_REMOTE_ERROR_MSG) except (OSError, socket.gaierror): - raise AirflowException( - "SSH connection timed out. Please check the network or server availability." - ) + raise AirflowException(Constants.BTEQ_REMOTE_ERROR_MSG) except SSHException as e: - raise AirflowException(f"An unexpected error occurred during SSH connection: {str(e)}") + raise AirflowException(f"{Constants.BTEQ_REMOTE_ERROR_MSG}: {str(e)}") except AirflowException as e: raise e except Exception as e: - raise AirflowException( - f"An unexpected error occurred while executing BTEQ script on remote machine: {str(e)}" - ) + raise AirflowException(f"{Constants.BTEQ_REMOTE_ERROR_MSG}: {str(e)}") finally: # Remove the local script file if encrypted_file_path and os.path.exists(encrypted_file_path): @@ -276,12 +272,12 @@ def execute_bteq_script_at_local( process.wait(timeout=timeout + 60) # Adding 1 minute extra for BTEQ script timeout except subprocess.TimeoutExpired: self.on_kill() - raise AirflowException(f"BTEQ command timed out after {timeout} seconds.") + raise AirflowException(Constants.BTEQ_TIMEOUT_ERROR_MSG, timeout) conn = self.get_conn() conn["sp"] = process # For `on_kill` support failure_message = None if stdout_data is None: - raise AirflowException("Process stdout is None. Unable to read BTEQ output.") + raise AirflowException(Constants.BTEQ_UNEXPECTED_ERROR_MSG) decoded_line = "" for line in stdout_data.splitlines(): try: @@ -302,7 +298,7 @@ def execute_bteq_script_at_local( else [bteq_quit_rc if bteq_quit_rc is not None else 0] ) ): - raise AirflowException(f"BTEQ task failed with error: {failure_message}") + raise AirflowException(f"{Constants.BTEQ_UNEXPECTED_ERROR_MSG}: {failure_message}") if failure_message: self.log.warning(failure_message) @@ -320,7 +316,7 @@ def on_kill(self): self.log.warning("Subprocess did not terminate in time. Forcing kill...") process.kill() except Exception as e: - self.log.error("Failed to terminate subprocess: %s", str(e)) + self.log.error("%s : %s", Constants.BTEQ_UNEXPECTED_ERROR_MSG, str(e)) def get_airflow_home_dir(self) -> str: """Get the AIRFLOW_HOME directory.""" @@ -331,7 +327,9 @@ def preferred_temp_directory(self, prefix="bteq_"): try: temp_dir = tempfile.gettempdir() if not os.path.isdir(temp_dir) or not os.access(temp_dir, os.W_OK): - raise OSError("OS temp dir not usable") + raise OSError( + f"Failed to execute the BTEQ script due to Temporary directory {temp_dir} is not writable." + ) except Exception: temp_dir = self.get_airflow_home_dir() diff --git a/providers/teradata/src/airflow/providers/teradata/operators/bteq.py b/providers/teradata/src/airflow/providers/teradata/operators/bteq.py index 0496201fbae07..9d043304240e9 100644 --- a/providers/teradata/src/airflow/providers/teradata/operators/bteq.py +++ b/providers/teradata/src/airflow/providers/teradata/operators/bteq.py @@ -27,6 +27,7 @@ prepare_bteq_script_for_remote_execution, read_file, ) +from airflow.providers.teradata.utils.constants import Constants if TYPE_CHECKING: from paramiko import SSHClient @@ -35,7 +36,6 @@ from airflow.sdk.definitions.context import Context except ImportError: from airflow.utils.context import Context - from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.providers.teradata.hooks.bteq import BteqHook from airflow.providers.teradata.hooks.teradata import TeradataHook @@ -114,9 +114,7 @@ def __init__( def execute(self, context: Context) -> int | None: """Execute BTEQ code using the BteqHook.""" if not self.sql and not self.file_path: - raise ValueError( - "BteqOperator requires either the 'sql' or 'file_path' parameter. Both are missing." - ) + raise ValueError(Constants.BTEQ_MISSED_PARAMS) self._hook = BteqHook(teradata_conn_id=self.teradata_conn_id, ssh_conn_id=self.ssh_conn_id) self._ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id) if self.ssh_conn_id else None @@ -159,15 +157,13 @@ def execute(self, context: Context) -> int | None: ) if self.file_path: if not is_valid_file(self.file_path): - raise ValueError( - f"The provided file path '{self.file_path}' is invalid or does not exist." - ) + raise ValueError(Constants.BTEQ_INVALID_PATH % self.file_path) try: is_valid_encoding(self.file_path, self.temp_file_read_encoding or "UTF-8") except UnicodeDecodeError as e: - errmsg = f"The provided file '{self.file_path}' encoding is different from BTEQ I/O encoding i.e.'UTF-8'." + errmsg = Constants.BTEQ_INVALID_CHARSET % (self.file_path, "UTF-8") if self.bteq_script_encoding: - errmsg = f"The provided file '{self.file_path}' encoding is different from the specified BTEQ I/O encoding '{self.bteq_script_encoding}'." + errmsg = Constants.BTEQ_INVALID_CHARSET % (self.file_path, self.bteq_script_encoding) raise ValueError(errmsg) from e return self._handle_local_bteq_file( file_path=self.file_path, @@ -200,13 +196,9 @@ def execute(self, context: Context) -> int | None: file_path=self.file_path, context=context, ) - raise ValueError( - f"The provided remote file path '{self.file_path}' is invalid or file does not exist on remote machine at given path." - ) + raise ValueError(Constants.BTEQ_REMOTE_FILE_PATH_INVALID % self.file_path) else: - raise ValueError( - "BteqOperator requires either the 'sql' or 'file_path' parameter. Both are missing." - ) + raise ValueError(Constants.BTEQ_MISSED_PARAMS) return None def _handle_remote_bteq_file( @@ -242,9 +234,7 @@ def _handle_remote_bteq_file( self.temp_file_read_encoding, ) return None - raise ValueError( - "Please provide a valid file path for the BTEQ script to be executed on the remote machine." - ) + raise ValueError(Constants.BTEQ_MISSED_PARAMS) def _handle_local_bteq_file( self, diff --git a/providers/teradata/src/airflow/providers/teradata/operators/teradata_compute_cluster.py b/providers/teradata/src/airflow/providers/teradata/operators/teradata_compute_cluster.py index 9a5158e6e96b4..ad0922240e594 100644 --- a/providers/teradata/src/airflow/providers/teradata/operators/teradata_compute_cluster.py +++ b/providers/teradata/src/airflow/providers/teradata/operators/teradata_compute_cluster.py @@ -125,34 +125,40 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None: """ self._compute_cluster_execute_complete(event) - def _compute_cluster_execute(self): + def _compute_cluster_execute(self, operation: str | None = None): # Verifies the provided compute profile name. if ( self.compute_profile_name is None or self.compute_profile_name == "None" or self.compute_profile_name == "" ): - self.log.info("Invalid compute cluster profile name") - raise AirflowException(Constants.CC_OPR_EMPTY_PROFILE_ERROR_MSG) - # Verifies if the provided Teradata instance belongs to Vantage Cloud Lake. - lake_support_find_sql = "SELECT count(1) from DBC.StorageV WHERE StorageName='TD_OFSSTORAGE'" - lake_support_result = self.hook.run(lake_support_find_sql, handler=_single_result_row_handler) - if lake_support_result is None: - raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG) + raise AirflowException(Constants.CC_OPR_EMPTY_PROFILE_ERROR_MSG % operation) + try: + # Verifies if the provided Teradata instance belongs to Vantage Cloud Lake. + lake_support_find_sql = "SELECT count(1) from DBC.StorageV WHERE StorageName='TD_OFSSTORAGE'" + lake_support_result = self.hook.run(lake_support_find_sql, handler=_single_result_row_handler) + if lake_support_result is None: + raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG % operation) + except Exception: + raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG % operation) # Getting teradata db version. Considering teradata instance is Lake when db version is 20 or above db_version_get_sql = "SELECT InfoData AS Version FROM DBC.DBCInfoV WHERE InfoKey = 'VERSION'" try: db_version_result = self.hook.run(db_version_get_sql, handler=_single_result_row_handler) if db_version_result is not None: - db_version_result = str(db_version_result) - db_version = db_version_result.split(".")[0] + # Safely extract the actual version string from the result + if isinstance(db_version_result, (list, tuple)) and db_version_result: + # e.g., if it's a tuple like ('17.10',), get the first element + version_str = str(db_version_result[0]) + else: + version_str = str(db_version_result) # fallback, should be rare + db_version = version_str.split(".")[0] if db_version is not None and int(db_version) < 20: - raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG) + raise AirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG % operation) else: - raise AirflowException("Error occurred while getting teradata database version") - except Exception as ex: - self.log.error("Error occurred while getting teradata database version: %s ", str(ex)) - raise AirflowException("Error occurred while getting teradata database version") + raise AirflowException(Constants.CC_ERR_VERSION_GET) + except Exception: + raise AirflowException(Constants.CC_ERR_VERSION_GET) def _compute_cluster_execute_complete(self, event: dict[str, Any]) -> None: if event["status"] == "success": @@ -270,8 +276,8 @@ def execute(self, context: Context): """ return self._compute_cluster_execute() - def _compute_cluster_execute(self): - super()._compute_cluster_execute() + def _compute_cluster_execute(self, operation: str | None = None): + super()._compute_cluster_execute("Provision") if self.compute_group_name: cg_status_query = ( "SELECT count(1) FROM DBC.ComputeGroups WHERE UPPER(ComputeGroupName) = UPPER('" @@ -300,7 +306,8 @@ def _compute_cluster_execute(self): cp_status_result = self._hook_run(cp_status_query, handler=_single_result_row_handler) if cp_status_result is not None: cp_status_result = str(cp_status_result) - msg = f"Compute Profile {self.compute_profile_name} is already exists under Compute Group {self.compute_group_name}. Status is {cp_status_result}" + msg = f"Compute Profile '{self.compute_profile_name}' already exists under Compute Group '{self.compute_group_name}'. Status: {cp_status_result}." + self.log.info(msg) return cp_status_result create_cp_query = self._build_ccp_setup_query() @@ -357,21 +364,22 @@ def execute(self, context: Context): """ return self._compute_cluster_execute() - def _compute_cluster_execute(self): - super()._compute_cluster_execute() + def _compute_cluster_execute(self, operation: str | None = None): + super()._compute_cluster_execute("Decommission") cp_drop_query = "DROP COMPUTE PROFILE " + self.compute_profile_name if self.compute_group_name: cp_drop_query = cp_drop_query + " IN COMPUTE GROUP " + self.compute_group_name self._hook_run(cp_drop_query, handler=_single_result_row_handler) self.log.info( - "Compute Profile %s IN Compute Group %s is successfully dropped", + "Compute Profile %s in Compute Group %s is successfully dropped.", self.compute_profile_name, self.compute_group_name, ) - if self.delete_compute_group: + + if self.delete_compute_group and self.compute_group_name: cg_drop_query = "DROP COMPUTE GROUP " + self.compute_group_name self._hook_run(cg_drop_query, handler=_single_result_row_handler) - self.log.info("Compute Group %s is successfully dropped", self.compute_group_name) + self.log.info("Compute Group %s is successfully dropped.", self.compute_group_name) class TeradataComputeClusterResumeOperator(_TeradataComputeClusterOperator): @@ -417,8 +425,8 @@ def execute(self, context: Context): """ return self._compute_cluster_execute() - def _compute_cluster_execute(self): - super()._compute_cluster_execute() + def _compute_cluster_execute(self, operation: str | None = None): + super()._compute_cluster_execute("Resume") cc_status_query = ( "SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('" + self.compute_profile_name @@ -432,15 +440,16 @@ def _compute_cluster_execute(self): # Generates an error message if the compute cluster does not exist for the specified # compute profile and compute group. else: - self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG) - raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG) + raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG % operation) if cp_status_result != Constants.CC_RESUME_DB_STATUS: cp_resume_query = f"RESUME COMPUTE FOR COMPUTE PROFILE {self.compute_profile_name}" if self.compute_group_name: cp_resume_query = f"{cp_resume_query} IN COMPUTE GROUP {self.compute_group_name}" return self._handle_cc_status(Constants.CC_RESUME_OPR, cp_resume_query) self.log.info( - "Compute Cluster %s already %s", self.compute_profile_name, Constants.CC_RESUME_DB_STATUS + "Compute Cluster %s is already in '%s' status.", + self.compute_profile_name, + Constants.CC_RESUME_DB_STATUS, ) @@ -487,8 +496,8 @@ def execute(self, context: Context): """ return self._compute_cluster_execute() - def _compute_cluster_execute(self): - super()._compute_cluster_execute() + def _compute_cluster_execute(self, operation: str | None = None): + super()._compute_cluster_execute("Suspend") sql = ( "SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('" + self.compute_profile_name @@ -502,13 +511,14 @@ def _compute_cluster_execute(self): # Generates an error message if the compute cluster does not exist for the specified # compute profile and compute group. else: - self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG) - raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG) + raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG % operation) if result != Constants.CC_SUSPEND_DB_STATUS: sql = f"SUSPEND COMPUTE FOR COMPUTE PROFILE {self.compute_profile_name}" if self.compute_group_name: sql = f"{sql} IN COMPUTE GROUP {self.compute_group_name}" return self._handle_cc_status(Constants.CC_SUSPEND_OPR, sql) self.log.info( - "Compute Cluster %s already %s", self.compute_profile_name, Constants.CC_SUSPEND_DB_STATUS + "Compute Cluster %s is already in '%s' status.", + self.compute_profile_name, + Constants.CC_SUSPEND_DB_STATUS, ) diff --git a/providers/teradata/src/airflow/providers/teradata/triggers/teradata_compute_cluster.py b/providers/teradata/src/airflow/providers/teradata/triggers/teradata_compute_cluster.py index df9e44b79b595..74b93c30f4130 100644 --- a/providers/teradata/src/airflow/providers/teradata/triggers/teradata_compute_cluster.py +++ b/providers/teradata/src/airflow/providers/teradata/triggers/teradata_compute_cluster.py @@ -73,8 +73,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: while True: status = await self.get_status() if status is None or len(status) == 0: - self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG) - raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG) + raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG % "manage") if ( self.operation_type == Constants.CC_SUSPEND_OPR or self.operation_type == Constants.CC_CREATE_SUSPEND_OPR @@ -108,8 +107,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: yield TriggerEvent( { "status": "error", - "message": Constants.CC_OPR_FAILURE_STATUS_MSG - % (self.compute_profile_name, self.operation_type), + "message": Constants.CC_OPR_TIMEOUT_ERROR + % (self.operation_type, self.compute_profile_name), } ) elif ( @@ -128,8 +127,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: yield TriggerEvent( { "status": "error", - "message": Constants.CC_OPR_FAILURE_STATUS_MSG - % (self.compute_profile_name, self.operation_type), + "message": Constants.CC_OPR_TIMEOUT_ERROR + % (self.operation_type, self.compute_profile_name), } ) else: @@ -137,7 +136,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: except Exception as e: yield TriggerEvent({"status": "error", "message": str(e)}) except asyncio.CancelledError: - self.log.error(Constants.CC_OPR_TIMEOUT_ERROR, self.operation_type) + self.log.error(Constants.CC_OPR_TIMEOUT_ERROR, self.operation_type, self.compute_profile_name) async def get_status(self) -> str: """Return compute cluster SUSPEND/RESUME operation status.""" diff --git a/providers/teradata/src/airflow/providers/teradata/utils/constants.py b/providers/teradata/src/airflow/providers/teradata/utils/constants.py index ee356ceb402e5..37edf8a61161f 100644 --- a/providers/teradata/src/airflow/providers/teradata/utils/constants.py +++ b/providers/teradata/src/airflow/providers/teradata/utils/constants.py @@ -29,18 +29,21 @@ class Constants: CC_SUSPEND_DB_STATUS = "Suspended" CC_RESUME_DB_STATUS = "Running" CC_OPR_SUCCESS_STATUS_MSG = "Compute Cluster %s %s operation completed successfully." - CC_OPR_FAILURE_STATUS_MSG = "Compute Cluster %s %s operation has failed." - CC_OPR_INITIALIZING_STATUS_MSG = "The environment is currently initializing. Please wait." - CC_OPR_EMPTY_PROFILE_ERROR_MSG = "Please provide a valid name for the compute cluster profile." - CC_GRP_PRP_NON_EXISTS_MSG = "The specified Compute cluster is not present or The user doesn't have permission to access compute cluster." - CC_GRP_PRP_UN_AUTHORIZED_MSG = "The %s operation is not authorized for the user." - CC_GRP_LAKE_SUPPORT_ONLY_MSG = "Compute Groups is supported only on Vantage Cloud Lake." - CC_OPR_TIMEOUT_ERROR = ( - "There is an issue with the %s operation. Kindly consult the administrator for assistance." + CC_OPR_EMPTY_PROFILE_ERROR_MSG = "Failed to %s the Vantage Cloud Lake Compute Cluster Instance due to an invalid compute cluster profile name." + CC_GRP_PRP_NON_EXISTS_MSG = "Failed to %s the Vantage Cloud Lake Compute Cluster Instance because the specified compute cluster does not exist or the user lacks the necessary permissions to access the Compute Cluster Instance." + CC_GRP_LAKE_SUPPORT_ONLY_MSG = "Failed to %s the Vantage Cloud Lake Compute Cluster Instance because the Compute Cluster feature is supported only on the Vantage Cloud Lake system." + CC_OPR_TIMEOUT_ERROR = "Failed to %s the Vantage Cloud Lake Compute Cluster Instance `%s`. Please contact the administrator for assistance." + CC_ERR_VERSION_GET = "Failed to manage the Vantage Cloud Lake Compute Cluster Instance due to an error while getting the Teradata database version." + BTEQ_REMOTE_ERROR_MSG = ( + "Failed to establish a SSH connection to the remote machine for executing the BTEQ script." ) - CC_GRP_PRP_EXISTS_MSG = "The specified Compute cluster is already exists." - CC_OPR_EMPTY_COPY_PROFILE_ERROR_MSG = ( - "Please provide a valid name for the source and target compute profile." + BTEQ_UNEXPECTED_ERROR_MSG = "Failure while executing BTEQ script due to unexpected error." + BTEQ_TIMEOUT_ERROR_MSG = "Failed to execute BTEQ script due to timeout after %s seconds." + BTEQ_MISSED_PARAMS = "Failed to execute BTEQ script due to missing required parameters: either 'sql' or 'file_path' must be provided." + BTEQ_INVALID_PATH = ( + "Failed to execute BTEQ script due to invalid file path: '%s' does not exist or is inaccessible." ) + BTEQ_INVALID_CHARSET = "Failed to execute BTEQ script because the provided file '%s' encoding differs from the specified BTEQ I/O encoding %s" + BTEQ_REMOTE_FILE_PATH_INVALID = "Failed to execute BTEQ script due to invalid remote file path: '%s' does not exist or is inaccessible on the remote machine." CC_OPR_TIME_OUT = 1200 CC_POLL_INTERVAL = 60 diff --git a/providers/teradata/tests/unit/teradata/hooks/test_bteq.py b/providers/teradata/tests/unit/teradata/hooks/test_bteq.py index 38269ecb99d17..e300ebdc104d9 100644 --- a/providers/teradata/tests/unit/teradata/hooks/test_bteq.py +++ b/providers/teradata/tests/unit/teradata/hooks/test_bteq.py @@ -186,7 +186,10 @@ def test_execute_bteq_script_at_local_failure_raises( mock_popen.return_value = mock_process mock_prepare_cmd.return_value = "bteq_command" - with pytest.raises(AirflowException, match="BTEQ task failed with error: Failure: some error occurred"): + with pytest.raises( + AirflowException, + match="Failure while executing BTEQ script due to unexpected error.: Failure: some error occurred", + ): hook.execute_bteq_script_at_local( bteq_script="SELECT * FROM test;", bteq_script_encoding="utf-8", @@ -248,6 +251,7 @@ def test_execute_bteq_script_at_remote_success( mock_ssh_client.exec_command.return_value = (mock_stdin, mock_stdout, mock_stderr) # Instantiate BteqHook + hook = BteqHook(ssh_conn_id="ssh_conn_id", teradata_conn_id="teradata_conn") # Call method under test @@ -322,7 +326,10 @@ def test_transfer_to_and_execute_bteq_on_remote_ssh_failure(mock_verify, hook_wi bteq_session_encoding="utf-8", tmp_dir="/tmp", ) - assert "SSH connection is not established" in str(excinfo.value) + assert ( + "Failed to establish a SSH connection to the remote machine for executing the BTEQ script." + in str(excinfo.value) + ) @patch("airflow.providers.teradata.hooks.bteq.verify_bteq_installed_remote") @@ -382,5 +389,5 @@ def test_remote_execution_cleanup_on_exception( tmp_dir=temp_dir, ) - # Verify local encrypted file is deleted + # After exception, encrypted file should be deleted assert not os.path.exists(encrypted_file_path) diff --git a/providers/teradata/tests/unit/teradata/operators/test_bteq.py b/providers/teradata/tests/unit/teradata/operators/test_bteq.py index 7afac54d311f8..856083e57adc7 100644 --- a/providers/teradata/tests/unit/teradata/operators/test_bteq.py +++ b/providers/teradata/tests/unit/teradata/operators/test_bteq.py @@ -150,7 +150,10 @@ def test_template_fields(self): def test_execute_raises_if_no_sql_or_file(self): op = BteqOperator(task_id="fail_case", teradata_conn_id="td_conn") - with pytest.raises(ValueError, match="requires either the 'sql' or 'file_path' parameter"): + with pytest.raises( + ValueError, + match="Failed to execute BTEQ script due to missing required parameters: either 'sql' or 'file_path' must be provided.", + ): op.execute({}) @mock.patch("airflow.providers.teradata.operators.bteq.is_valid_file", return_value=False) @@ -160,7 +163,7 @@ def test_invalid_file_path(self, mock_is_valid_file): file_path="/invalid/path.sql", teradata_conn_id="td_conn", ) - with pytest.raises(ValueError, match="is invalid or does not exist"): + with pytest.raises(ValueError, match="Failed to execute BTEQ script due to invalid file path"): op.execute({}) @mock.patch("airflow.providers.teradata.operators.bteq.is_valid_file", return_value=True) @@ -175,7 +178,10 @@ def test_file_encoding_error(self, mock_encoding, mock_valid_file): bteq_script_encoding="UTF-8", teradata_conn_id="td_conn", ) - with pytest.raises(ValueError, match="encoding is different from BTEQ I/O encoding"): + with pytest.raises( + ValueError, + match="Failed to execute BTEQ script because the provided file.*encoding differs from the specified BTEQ I/O encoding", + ): op.execute({}) @mock.patch("airflow.providers.teradata.operators.bteq.BteqHook.execute_bteq_script") diff --git a/providers/teradata/tests/unit/teradata/triggers/test_teradata_compute_cluster.py b/providers/teradata/tests/unit/teradata/triggers/test_teradata_compute_cluster.py index ef6e2f1ab191f..a9a1fedce6c87 100644 --- a/providers/teradata/tests/unit/teradata/triggers/test_teradata_compute_cluster.py +++ b/providers/teradata/tests/unit/teradata/triggers/test_teradata_compute_cluster.py @@ -81,7 +81,9 @@ async def test_run_suspend_failure(): with patch.object(trigger, "get_status") as mock_get_status: mock_get_status.return_value = None async for event in trigger.run(): - assert event == TriggerEvent({"status": "error", "message": Constants.CC_GRP_PRP_NON_EXISTS_MSG}) + assert event == TriggerEvent( + {"status": "error", "message": Constants.CC_GRP_PRP_NON_EXISTS_MSG % "manage"} + ) mock_get_status.assert_called_once() @@ -117,7 +119,9 @@ async def test_run_resume_failure(): with patch.object(trigger, "get_status") as mock_get_status: mock_get_status.return_value = None async for event in trigger.run(): - assert event == TriggerEvent({"status": "error", "message": Constants.CC_GRP_PRP_NON_EXISTS_MSG}) + assert event == TriggerEvent( + {"status": "error", "message": Constants.CC_GRP_PRP_NON_EXISTS_MSG % "manage"} + ) mock_get_status.assert_called_once() diff --git a/providers/teradata/tests/unit/teradata/utils/test_constants.py b/providers/teradata/tests/unit/teradata/utils/test_constants.py index 0760337537a11..c6416d6f41283 100644 --- a/providers/teradata/tests/unit/teradata/utils/test_constants.py +++ b/providers/teradata/tests/unit/teradata/utils/test_constants.py @@ -57,51 +57,21 @@ def test_operation_success_message(): assert expected_msg == Constants.CC_OPR_SUCCESS_STATUS_MSG -def test_operation_failure_message(): - expected_msg = "Compute Cluster %s %s operation has failed." - assert expected_msg == Constants.CC_OPR_FAILURE_STATUS_MSG - - -def test_initializing_status_message(): - expected_msg = "The environment is currently initializing. Please wait." - assert expected_msg == Constants.CC_OPR_INITIALIZING_STATUS_MSG - - def test_empty_profile_error_message(): - expected_msg = "Please provide a valid name for the compute cluster profile." + expected_msg = "Failed to %s the Vantage Cloud Lake Compute Cluster Instance due to an invalid compute cluster profile name." assert expected_msg == Constants.CC_OPR_EMPTY_PROFILE_ERROR_MSG def test_non_exists_message(): - expected_msg = "The specified Compute cluster is not present or The user doesn't have permission to access compute cluster." + expected_msg = "Failed to %s the Vantage Cloud Lake Compute Cluster Instance because the specified compute cluster does not exist or the user lacks the necessary permissions to access the Compute Cluster Instance." assert expected_msg == Constants.CC_GRP_PRP_NON_EXISTS_MSG -def test_unauthorized_message(): - expected_msg = "The %s operation is not authorized for the user." - assert expected_msg == Constants.CC_GRP_PRP_UN_AUTHORIZED_MSG - - def test_lake_support_only_message(): - expected_msg = "Compute Groups is supported only on Vantage Cloud Lake." + expected_msg = "Failed to %s the Vantage Cloud Lake Compute Cluster Instance because the Compute Cluster feature is supported only on the Vantage Cloud Lake system." assert expected_msg == Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG -def test_timeout_error_message(): - expected_msg = "There is an issue with the %s operation. Kindly consult the administrator for assistance." - assert expected_msg == Constants.CC_OPR_TIMEOUT_ERROR - - -def test_exists_message(): - expected_msg = "The specified Compute cluster is already exists." - assert expected_msg == Constants.CC_GRP_PRP_EXISTS_MSG - - -def test_empty_copy_profile_error_message(): - expected_msg = "Please provide a valid name for the source and target compute profile." - assert expected_msg == Constants.CC_OPR_EMPTY_COPY_PROFILE_ERROR_MSG - - def test_timeout_value(): assert Constants.CC_OPR_TIME_OUT == 1200