diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py index d7fbc438fd955..5026b63005ebe 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py @@ -642,7 +642,12 @@ async def wait_command_execution_result( self.log.exception("Exception occurred while polling CMD result") raise AirflowException(ex) - result_dict = PollAirflowCommandResponse.to_dict(result) + try: + result_dict = PollAirflowCommandResponse.to_dict(result) + except Exception as ex: + self.log.exception("Exception occurred while transforming PollAirflowCommandResponse") + raise AirflowException(ex) + if result_dict["output_end"]: return result_dict diff --git a/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py b/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py index 59704b3b4cb2e..8d5a13561d470 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py @@ -764,9 +764,15 @@ def execute(self, context: Context): metadata=self.metadata, poll_interval=self.poll_interval, ) - result_str = self._merge_cmd_output_result(result) - self.log.info("Command execution result:\n%s", result_str) - return result + exit_code = result.get("exit_info", {}).get("exit_code") + if exit_code == 0: + result_str = self._merge_cmd_output_result(result) + self.log.info("Command execution result:\n%s", result_str) + return result + + error_output = "".join(line["content"] for line in result.get("error", [])) + message = f"Airflow CLI command failed with exit code {exit_code}.\nError output:\n{error_output}" + raise AirflowException(message) def execute_complete(self, context: Context, event: dict) -> dict: if event and event["status"] == "error": diff --git a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py index 96748bc2a7c49..144de73469ad8 100644 --- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py +++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py @@ -145,10 +145,23 @@ async def run(self): ) return + exit_code = result.get("exit_info", {}).get("exit_code") + + if exit_code == 0: + yield TriggerEvent( + { + "status": "success", + "result": result, + } + ) + return + + error_output = "".join(line["content"] for line in result.get("error", [])) + message = f"Airflow CLI command failed with exit code {exit_code}.\nError output:\n{error_output}" yield TriggerEvent( { - "status": "success", - "result": result, + "status": "error", + "message": message, } ) return diff --git a/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py b/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py index e882db9526a01..e6da6cf5cf9f6 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py +++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py @@ -319,6 +319,12 @@ class TestCloudComposerRunAirflowCLICommandOperator: @mock.patch(COMPOSER_STRING.format("ExecuteAirflowCommandResponse.to_dict")) @mock.patch(COMPOSER_STRING.format("CloudComposerHook")) def test_execute(self, mock_hook, to_dict_mode) -> None: + mock_hook.return_value.wait_command_execution_result.return_value = { + "exit_info": {"exit_code": 0}, + "output": [ + {"content": "test"}, + ], + } op = CloudComposerRunAirflowCLICommandOperator( task_id=TASK_ID, project_id=TEST_GCP_PROJECT,