Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,12 @@ async def wait_command_execution_result(
self.log.exception("Exception occurred while polling CMD result")
raise AirflowException(ex)

result_dict = PollAirflowCommandResponse.to_dict(result)
try:
result_dict = PollAirflowCommandResponse.to_dict(result)
except Exception as ex:
self.log.exception("Exception occurred while transforming PollAirflowCommandResponse")
raise AirflowException(ex)

if result_dict["output_end"]:
return result_dict

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,9 +764,15 @@ def execute(self, context: Context):
metadata=self.metadata,
poll_interval=self.poll_interval,
)
result_str = self._merge_cmd_output_result(result)
self.log.info("Command execution result:\n%s", result_str)
return result
exit_code = result.get("exit_info", {}).get("exit_code")
if exit_code == 0:
result_str = self._merge_cmd_output_result(result)
self.log.info("Command execution result:\n%s", result_str)
return result

error_output = "".join(line["content"] for line in result.get("error", []))
message = f"Airflow CLI command failed with exit code {exit_code}.\nError output:\n{error_output}"
raise AirflowException(message)

def execute_complete(self, context: Context, event: dict) -> dict:
if event and event["status"] == "error":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,23 @@ async def run(self):
)
return

exit_code = result.get("exit_info", {}).get("exit_code")

if exit_code == 0:
yield TriggerEvent(
{
"status": "success",
"result": result,
}
)
return

error_output = "".join(line["content"] for line in result.get("error", []))
message = f"Airflow CLI command failed with exit code {exit_code}.\nError output:\n{error_output}"
yield TriggerEvent(
{
"status": "success",
"result": result,
"status": "error",
"message": message,
}
)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,12 @@ class TestCloudComposerRunAirflowCLICommandOperator:
@mock.patch(COMPOSER_STRING.format("ExecuteAirflowCommandResponse.to_dict"))
@mock.patch(COMPOSER_STRING.format("CloudComposerHook"))
def test_execute(self, mock_hook, to_dict_mode) -> None:
mock_hook.return_value.wait_command_execution_result.return_value = {
"exit_info": {"exit_code": 0},
"output": [
{"content": "test"},
],
}
op = CloudComposerRunAirflowCLICommandOperator(
task_id=TASK_ID,
project_id=TEST_GCP_PROJECT,
Expand Down
Loading