diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index d330c3b96d598..a2ada63834e0a 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -169,6 +169,7 @@ jobs: AIRFLOW_MONITOR_DELAY_TIME_IN_SECONDS: "${{inputs.monitor-delay-time-in-seconds}}" VERBOSE: "true" DEFAULT_BRANCH: "${{ inputs.default-branch }}" + TOTAL_TEST_TIMEOUT: "3600" # 60 minutes in seconds if: inputs.test-group == 'core' || inputs.skip-providers-tests != 'true' steps: - name: "Cleanup repo" diff --git a/dev/breeze/src/airflow_breeze/commands/ci_image_commands.py b/dev/breeze/src/airflow_breeze/commands/ci_image_commands.py index 849d590122c1e..75085d847930e 100644 --- a/dev/breeze/src/airflow_breeze/commands/ci_image_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/ci_image_commands.py @@ -176,7 +176,7 @@ def run_build_in_parallel( ] check_async_run_results( results=results, - success="All images built correctly", + success_message="All images built correctly", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, @@ -505,7 +505,7 @@ def run_verify_in_parallel( ] check_async_run_results( results=results, - success="All images verified", + success_message="All images verified", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, diff --git a/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py b/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py index fcedb398a3b53..d8c68cb03d7de 100644 --- a/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py @@ -361,7 +361,7 @@ def create_cluster( ] check_async_run_results( results=results, - success="All clusters created.", + success_message="All clusters created.", outputs=outputs, skip_cleanup=skip_cleanup, include_success_outputs=include_success_outputs, @@ -699,7 +699,7 @@ def build_k8s_image( ] check_async_run_results( results=results, - success="All K8S images built correctly.", + success_message="All K8S images built correctly.", outputs=outputs, skip_cleanup=skip_cleanup, include_success_outputs=include_success_outputs, @@ -776,7 +776,7 @@ def upload_k8s_image( ] check_async_run_results( results=results, - success="All K8S images uploaded correctly.", + success_message="All K8S images uploaded correctly.", outputs=outputs, skip_cleanup=skip_cleanup, include_success_outputs=include_success_outputs, @@ -961,7 +961,7 @@ def configure_cluster( ] check_async_run_results( results=results, - success="All clusters configured correctly.", + success_message="All clusters configured correctly.", outputs=outputs, skip_cleanup=skip_cleanup, include_success_outputs=include_success_outputs, @@ -1228,7 +1228,7 @@ def deploy_airflow( ] check_async_run_results( results=results, - success="All Airflow charts successfully deployed.", + success_message="All Airflow charts successfully deployed.", outputs=outputs, skip_cleanup=skip_cleanup, include_success_outputs=include_success_outputs, @@ -1570,7 +1570,7 @@ def kubernetes_tests_command( ] check_async_run_results( results=results, - success="All K8S tests successfully completed.", + success_message="All K8S tests successfully completed.", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, @@ -1822,7 +1822,7 @@ def run_complete_tests( ] check_async_run_results( results=results, - success="All K8S tests successfully completed.", + success_message="All K8S tests successfully completed.", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, diff --git a/dev/breeze/src/airflow_breeze/commands/production_image_commands.py b/dev/breeze/src/airflow_breeze/commands/production_image_commands.py index f3f06db41c4a5..9aee7a0fa709c 100644 --- a/dev/breeze/src/airflow_breeze/commands/production_image_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/production_image_commands.py @@ -145,7 +145,7 @@ def run_build_in_parallel( ] check_async_run_results( results=results, - success="All images built correctly", + success_message="All images built correctly", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, @@ -527,7 +527,7 @@ def run_verify_in_parallel( ] check_async_run_results( results=results, - success="All images verified", + success_message="All images verified", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, diff --git a/dev/breeze/src/airflow_breeze/commands/release_management_commands.py b/dev/breeze/src/airflow_breeze/commands/release_management_commands.py index b3a84a26fe156..318dcaa4f1a39 100644 --- a/dev/breeze/src/airflow_breeze/commands/release_management_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/release_management_commands.py @@ -1200,7 +1200,7 @@ def run_generate_constraints_in_parallel( ] check_async_run_results( results=results, - success="All constraints are generated.", + success_message="All constraints are generated.", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, @@ -1578,7 +1578,7 @@ def install_provider_distributions( ] check_async_run_results( results=results, - success="All packages installed successfully", + success_message="All packages installed successfully", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, diff --git a/dev/breeze/src/airflow_breeze/commands/sbom_commands.py b/dev/breeze/src/airflow_breeze/commands/sbom_commands.py index 1c0a76708025d..449c1b318fef4 100644 --- a/dev/breeze/src/airflow_breeze/commands/sbom_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/sbom_commands.py @@ -417,7 +417,7 @@ def _dir_exists_warn_and_should_skip(dir: Path, force: bool) -> bool: ] check_async_run_results( results=results, - success="All SBOMs were generated successfully", + success_message="All SBOMs were generated successfully", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, @@ -601,7 +601,7 @@ def build_all_airflow_images( ] check_async_run_results( results=results, - success="All airflow base images were built successfully", + success_message="All airflow base images were built successfully", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, @@ -750,7 +750,7 @@ def generate_providers_requirements( ] check_async_run_results( results=results, - success="Providers requirements were generated successfully", + success_message="Providers requirements were generated successfully", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py b/dev/breeze/src/airflow_breeze/commands/testing_commands.py index 8f58ad078e742..c32c730c4eb6e 100644 --- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py @@ -20,7 +20,9 @@ import os import signal import sys +from collections.abc import Generator from datetime import datetime +from multiprocessing.pool import Pool from time import sleep import click @@ -270,7 +272,10 @@ def _run_test( def _get_project_names(shell_params: ShellParams) -> tuple[str, str]: """Return compose project name and project name.""" project_name = file_name_from_test_type(shell_params.test_type) - compose_project_name = f"airflow-test-{project_name}" + if shell_params.test_type == ALL_TEST_TYPE: + compose_project_name = "airflow-test" + else: + compose_project_name = f"airflow-test-{project_name}" return compose_project_name, project_name @@ -283,16 +288,18 @@ def _dump_container_logs(output: Output | None, shell_params: ShellParams): text=True, ) container_ids = ps_result.stdout.splitlines() - get_console(output=output).print("[info]Wait 10 seconds for logs to find their way to stderr.\n") + get_console(output=output).print("[warning]Wait 10 seconds for logs to find their way to stderr.\n") sleep(10) compose_project_name, project_name = _get_project_names(shell_params) - get_console(output=output).print(f"[info]Dumping containers: {container_ids} for {project_name}.\n") + get_console(output=output).print( + f"[warning]Dumping container logs: {container_ids} for compose project {compose_project_name} (cp.\n" + ) date_str = datetime.now().strftime("%Y_%d_%m_%H_%M_%S") for container_id in container_ids: if compose_project_name not in container_id: continue dump_path = FILES_PATH / f"container_logs_{container_id}_{date_str}.log" - get_console(output=output).print(f"[info]Dumping container {container_id} to {dump_path}\n") + get_console(output=output).print(f"[info]Dumping container log {container_id} to {dump_path}\n") with open(dump_path, "w") as outfile: run_command( ["docker", "logs", "--details", "--timestamps", container_id], @@ -312,6 +319,7 @@ def _run_tests_in_pool( skip_docker_compose_down: bool, test_timeout: int, tests_to_run: list[str], + handler: TimeoutHandler, ): if not tests_to_run: return @@ -349,6 +357,7 @@ def _run_tests_in_pool( lines_to_search=400, ), ) as (pool, outputs): + handler.set_pool(pool) results = [ pool.apply_async( _run_test, @@ -366,12 +375,13 @@ def _run_tests_in_pool( escaped_tests = [test.replace("[", "\\[") for test in tests_to_run] check_async_run_results( results=results, - success=f"Tests {' '.join(escaped_tests)} completed successfully", + success_message=f"Tests {' '.join(escaped_tests)} completed successfully", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, summarize_on_ci=SummarizeAfter.FAILURE, summary_start_regexp=r".*= FAILURES.*|.*= ERRORS.*", + terminated_on_timeout=handler.terminated_on_timeout_output_list[0], ) @@ -395,6 +405,7 @@ def run_tests_in_parallel( parallelism: int, skip_cleanup: bool, skip_docker_compose_down: bool, + handler: TimeoutHandler, ) -> None: get_console().print("\n[info]Summary of the tests to run\n") get_console().print(f"[info]Running tests in parallel with parallelism={parallelism}") @@ -417,6 +428,7 @@ def run_tests_in_parallel( debug_resources=debug_resources, skip_cleanup=skip_cleanup, skip_docker_compose_down=skip_docker_compose_down, + handler=handler, ) @@ -1255,43 +1267,29 @@ def python_api_client_tests( sys.exit(returncode) -@contextlib.contextmanager -def run_with_timeout(timeout: int, shell_params: ShellParams): - def timeout_handler(signum, frame): - get_console().print("[warning]Timeout reached. Killing the container(s)[/]:") - _print_all_containers() - if os.environ.get("CI") == "true": - get_console().print("[warning]Dumping container logs first[/]:") - _dump_container_logs(output=None, shell_params=shell_params) - list_of_containers = _get_running_containers().stdout.splitlines() - get_console().print("[warning]Attempting to send TERM signal to all remaining containers:") - get_console().print(list_of_containers) - _send_signal_to_containers(list_of_containers, "SIGTERM") - get_console().print(f"[warning]Waiting {GRACE_CONTAINER_STOP_TIMEOUT} seconds for containers to stop") - sleep(GRACE_CONTAINER_STOP_TIMEOUT) - containers_left = _get_running_containers().stdout.splitlines() - if containers_left: - get_console().print("[warning]Some containers are still running. Killing them with SIGKILL:") - get_console().print(containers_left) - _send_signal_to_containers(list_of_containers, "SIGKILL") - get_console().print( - f"[warning]Waiting {GRACE_CONTAINER_STOP_TIMEOUT} seconds for containers to stop" - ) - sleep(GRACE_CONTAINER_STOP_TIMEOUT) - containers_left = _get_running_containers().stdout.splitlines() - if containers_left: - get_console().print("[error]Some containers are still running. Exiting anyway.") - get_console().print(containers_left) - sys.exit(1) +class TimeoutHandler: + def __init__(self, shell_params: ShellParams, terminated_on_timeout_output_list: list[bool]): + # Initialize the timeout handler with shell parameters and a list to track terminated outputs + # The terminated_on_timeout_output_list list is used to signal to the outside world that the + # output has been terminate by setting the first element to True when the timeout is reached. + self.shell_params = shell_params + self.pool: Pool | None = None + self.terminated_on_timeout_output_list = terminated_on_timeout_output_list + self.terminated_on_timeout_output_list[0] = False - def _send_signal_to_containers(list_of_containers: list[str], signal: str): + def set_pool(self, pool: Pool): + self.pool = pool + + @staticmethod + def _send_signal_to_containers(list_of_containers: list[str], signal_number: str): run_command( - ["docker", "kill", "--signal", signal, *list_of_containers], + ["docker", "kill", "--signal", signal_number, *list_of_containers], check=True, capture_output=False, text=True, ) + @staticmethod def _get_running_containers() -> RunCommandResult: return run_command( ["docker", "ps", "-q"], @@ -1300,16 +1298,59 @@ def _get_running_containers() -> RunCommandResult: text=True, ) + @staticmethod def _print_all_containers(): run_command( ["docker", "ps"], check=True, ) - signal.signal(signal.SIGALRM, timeout_handler) + def timeout_method(self, signum, frame): + get_console().print("[warning]Timeout reached.[/]") + if self.pool: + get_console().print("[warning]Terminating the pool[/]") + self.pool.close() + self.pool.terminate() + # No join here. The pool is joined already in the main function + get_console().print("[warning]Stopping all running containers[/]:") + self._print_all_containers() + if os.environ.get("CI") == "true": + get_console().print("[warning]Dumping container logs first[/]") + _dump_container_logs(output=None, shell_params=self.shell_params) + list_of_containers = self._get_running_containers().stdout.splitlines() + get_console().print("[warning]Attempting to send TERM signal to all remaining containers:") + get_console().print(list_of_containers) + self._send_signal_to_containers(list_of_containers, "SIGTERM") + get_console().print(f"[warning]Waiting {GRACE_CONTAINER_STOP_TIMEOUT} seconds for containers to stop") + sleep(GRACE_CONTAINER_STOP_TIMEOUT) + containers_left = self._get_running_containers().stdout.splitlines() + if containers_left: + get_console().print("[warning]Some containers are still running. Killing them with SIGKILL:") + get_console().print(containers_left) + self._send_signal_to_containers(list_of_containers, "SIGKILL") + get_console().print( + f"[warning]Waiting {GRACE_CONTAINER_STOP_TIMEOUT} seconds for containers to stop" + ) + sleep(GRACE_CONTAINER_STOP_TIMEOUT) + containers_left = self._get_running_containers().stdout.splitlines() + if containers_left: + get_console().print( + "[error]Some containers are still running. Marking stuff as terminated anyway." + ) + get_console().print(containers_left) + get_console().print( + "[warning]All containers stopped. Marking the whole run as terminated on timeout[/]" + ) + self.terminated_on_timeout_output_list[0] = True + + +@contextlib.contextmanager +def run_with_timeout(timeout: int, shell_params: ShellParams) -> Generator[TimeoutHandler, None, None]: + timeout_handler = TimeoutHandler(shell_params=shell_params, terminated_on_timeout_output_list=[False]) + signal.signal(signal.SIGALRM, lambda signum, frame: timeout_handler.timeout_method(signum, frame)) signal.alarm(timeout) try: - yield + yield timeout_handler finally: signal.alarm(0) @@ -1429,7 +1470,7 @@ def _run_test_command( f"Your test type = {test_type}\n" ) sys.exit(1) - with run_with_timeout(total_test_timeout, shell_params=shell_params): + with run_with_timeout(total_test_timeout, shell_params=shell_params) as handler: run_tests_in_parallel( shell_params=shell_params, extra_pytest_args=extra_pytest_args, @@ -1439,6 +1480,7 @@ def _run_test_command( skip_cleanup=skip_cleanup, debug_resources=debug_resources, skip_docker_compose_down=skip_docker_compose_down, + handler=handler, ) else: if shell_params.test_type == ALL_TEST_TYPE: diff --git a/dev/breeze/src/airflow_breeze/utils/debug_pyproject_toml.py b/dev/breeze/src/airflow_breeze/utils/debug_pyproject_toml.py index 1b569527705da..5b5a818820e64 100644 --- a/dev/breeze/src/airflow_breeze/utils/debug_pyproject_toml.py +++ b/dev/breeze/src/airflow_breeze/utils/debug_pyproject_toml.py @@ -28,7 +28,7 @@ def debug_pyproject_tomls(pyproject_toml_paths: list[Path]) -> None: if get_verbose() or get_dry_run(): for pyproject_toml_path in pyproject_toml_paths: - with ci_group(f"Updated {pyproject_toml_path} content", message_type=MessageType.INFO): + with ci_group(f"Updated {pyproject_toml_path} content", MessageType.INFO): # Format the content to make it more readable with rich syntax = Syntax(pyproject_toml_path.read_text(), "toml", theme="ansi_dark", line_numbers=True) get_console().print(syntax) diff --git a/dev/breeze/src/airflow_breeze/utils/image.py b/dev/breeze/src/airflow_breeze/utils/image.py index 55c9b54a8f621..e6c6b7bb4159d 100644 --- a/dev/breeze/src/airflow_breeze/utils/image.py +++ b/dev/breeze/src/airflow_breeze/utils/image.py @@ -89,7 +89,7 @@ def get_kwds(index: int, image_param: BuildCiParams | BuildProdParams): ] check_async_run_results( results=results, - success="All images pulled", + success_message="All images pulled", outputs=outputs, include_success_outputs=include_success_outputs, skip_cleanup=skip_cleanup, diff --git a/dev/breeze/src/airflow_breeze/utils/parallel.py b/dev/breeze/src/airflow_breeze/utils/parallel.py index 1d6af2d174865..349231193af7b 100644 --- a/dev/breeze/src/airflow_breeze/utils/parallel.py +++ b/dev/breeze/src/airflow_breeze/utils/parallel.py @@ -343,9 +343,9 @@ def run(self): get_console().print_exception(show_locals=True) -def print_async_summary(completed_list: list[ApplyResult]) -> None: +def print_async_result_status(completed_list: list[ApplyResult]) -> None: """ - Print summary of completed async results. + Print status of completed async results. :param completed_list: list of completed async results. """ completed_list.sort(key=lambda x: x.get()[1]) @@ -374,19 +374,26 @@ class SummarizeAfter(Enum): def check_async_run_results( results: list[ApplyResult], - success: str, + success_message: str, outputs: list[Output], include_success_outputs: bool, poll_time_seconds: float = 0.2, skip_cleanup: bool = False, summarize_on_ci: SummarizeAfter = SummarizeAfter.NO_SUMMARY, summary_start_regexp: str | None = None, + terminated_on_timeout: bool = False, ): """ - Check if all async results were success. Exits with error if not. + Check if all async results were success. + + Exits with error if: + + * exit code 1: some tasks failed + * exit code 2: some tasks were terminated on timeout + :param results: results of parallel runs (expected in the form of Tuple: (return_code, info) :param outputs: outputs where results are written to - :param success: Success string printed when everything is OK + :param success_message: Success string printed when everything is OK :param include_success_outputs: include outputs of successful parallel runs :param poll_time_seconds: what's the poll time between checks :param skip_cleanup: whether to skip cleanup of temporary files. @@ -395,9 +402,37 @@ def check_async_run_results( :param summary_start_regexp: the regexp that determines line after which outputs should be printed as summary, so that you do not have to look at the folded details of the run in CI + :param terminated_on_timeout: whether the run was terminated on timeout """ - from airflow_breeze.utils.ci_group import ci_group + if terminated_on_timeout: + print_outputs_on_timeout(outputs, results, include_success_outputs) + sys.exit(2) + completed_list = wait_for_all_tasks_completed(poll_time_seconds, results) + print_async_result_status(completed_list) + print_logs_on_completion(include_success_outputs, outputs, results) + summarize_results_outside_of_folded_logs(outputs, results, summarize_on_ci, summary_start_regexp) + if finalize_async_tasks(outputs, results, skip_cleanup, success_message): + sys.exit(1) + + +def print_logs_on_completion( + include_success_outputs: bool, outputs: list[Output], results: list[ApplyResult] +): + for i, result in enumerate(results): + if result.get()[0] != 0: + message_type = MessageType.ERROR + else: + message_type = MessageType.SUCCESS + if message_type == MessageType.ERROR or include_success_outputs: + from airflow_breeze.utils.ci_group import ci_group + + with ci_group(f"{outputs[i].escaped_title}", message_type): + os.write(1, Path(outputs[i].file_name).read_bytes()) + else: + get_console().print(f"[success]{outputs[i].escaped_title} OK[/]") + +def wait_for_all_tasks_completed(poll_time_seconds: float, results: list[ApplyResult]) -> list[ApplyResult]: completed_number = 0 total_number_of_results = len(results) completed_list = get_completed_result_list(results) @@ -409,7 +444,7 @@ def check_async_run_results( f"\n[info]Completed {completed_number} out of {total_number_of_results} " f"({completed_number / total_number_of_results:.0%}).[/]\n" ) - print_async_summary(completed_list) + print_async_result_status(completed_list) time.sleep(poll_time_seconds) completed_list = get_completed_result_list(results) completed_number = len(completed_list) @@ -417,50 +452,113 @@ def check_async_run_results( f"\n[info]Completed {completed_number} out of {total_number_of_results} " f"({completed_number / total_number_of_results:.0%}).[/]\n" ) - print_async_summary(completed_list) + return completed_list + + +def finalize_async_tasks( + outputs: list[Output], results: list[ApplyResult], skip_cleanup: bool, success_message: str +) -> bool: + """ + Finalize async tasks by checking results and cleaning up temporary files. + + :param outputs: List of Output objects containing file names and titles. + :param results: List of ApplyResult objects containing the results of the tasks. + :param skip_cleanup: Whether to skip cleanup of temporary files. + :param success_message: Message to print if all tasks were successful. + :return: True if there were errors, False otherwise. + """ errors = False - for i, result in enumerate(results): + for result in results: if result.get()[0] != 0: errors = True + if errors: + get_console().print("\n[error]There were errors when running some tasks. Quitting.[/]\n") + else: + get_console().print(f"\n[success]{success_message}[/]\n") + if not skip_cleanup: + for output in outputs: + Path(output.file_name).unlink(missing_ok=True) + from airflow_breeze.utils.docker_command_utils import fix_ownership_using_docker + + fix_ownership_using_docker() + return errors + + +def summarize_results_outside_of_folded_logs( + outputs: list[Output], + results: list[ApplyResult], + summarize_on_ci: SummarizeAfter, + summary_start_regexp: str | None = None, +): + """ + Print summary of the results outside the folded logs in CI. + + :param outputs: List of Output objects containing file names and titles. + :param results: List of ApplyResult objects containing the results of the tasks. + :param summarize_on_ci: Determines when to summarize the parallel jobs when they are completed in + CI, outside the folded CI output. + :param summary_start_regexp: The regexp that determines line after which + outputs should be printed as summary, so that you do not have to look at the folded details of + the run in CI. + """ + if summarize_on_ci == SummarizeAfter.NO_SUMMARY: + return + regex = re.compile(summary_start_regexp) if summary_start_regexp is not None else None + for i, result in enumerate(results): + failure = result.get()[0] != 0 + if summarize_on_ci in [ + SummarizeAfter.BOTH, + SummarizeAfter.FAILURE if failure else SummarizeAfter.SUCCESS, + ]: + print_lines = False + for line in Path(outputs[i].file_name).read_bytes().decode(errors="ignore").splitlines(): + if not print_lines and (regex is None or regex.match(remove_ansi_colours(line))): + print_lines = True + get_console().print(f"\n[info]Summary: {outputs[i].escaped_title:<30}:\n") + if print_lines: + print(line) + + +def print_outputs_on_timeout( + outputs: list[Output], results: list[ApplyResult], include_success_outputs: bool +): + """ + Print outputs of the tasks that were terminated on timeout. + This function is called when some tasks were terminated on timeout. + It prints the outputs of the tasks that were terminated on timeout, + and the outputs of the tasks that were successful if `include_success_outputs` is True. + :param outputs: list of Output objects containing file names and titles + :param results: list of ApplyResult objects containing the results of the tasks + :param include_success_outputs: whether to include outputs of successful tasks + """ + get_console().print( + "\n[warning]Some tasks were terminated on timeout. " + "Please check the logs of the tasks (below) for more details.[/]\n" + ) + for i, result in enumerate(results): + try: + exit_code = result.get(timeout=0)[0] + except Exception: + exit_code = -1 + if exit_code != 0: message_type = MessageType.ERROR else: message_type = MessageType.SUCCESS + output = outputs[i] if message_type == MessageType.ERROR or include_success_outputs: - with ci_group(title=f"{outputs[i].escaped_title}", message_type=message_type): - os.write(1, Path(outputs[i].file_name).read_bytes()) + from airflow_breeze.utils.ci_group import ci_group + + with ci_group(f"{output.escaped_title}", message_type): + os.write(1, Path(output.file_name).read_bytes()) else: get_console().print(f"[success]{outputs[i].escaped_title} OK[/]") - if summarize_on_ci != SummarizeAfter.NO_SUMMARY: - regex = re.compile(summary_start_regexp) if summary_start_regexp is not None else None - for i, result in enumerate(results): - failure = result.get()[0] != 0 - if summarize_on_ci in [ - SummarizeAfter.BOTH, - SummarizeAfter.FAILURE if failure else SummarizeAfter.SUCCESS, - ]: - print_lines = False - for line in Path(outputs[i].file_name).read_bytes().decode(errors="ignore").splitlines(): - if not print_lines and (regex is None or regex.match(remove_ansi_colours(line))): - print_lines = True - get_console().print(f"\n[info]Summary: {outputs[i].escaped_title:<30}:\n") - if print_lines: - print(line) - try: - if errors: - get_console().print("\n[error]There were errors when running some tasks. Quitting.[/]\n") - from airflow_breeze.utils.docker_command_utils import fix_ownership_using_docker + get_console().print( + "\n[warning]Some tasks were terminated on timeout. " + "Please check the logs of the tasks (above) for more details.[/]\n" + ) + from airflow_breeze.utils.docker_command_utils import fix_ownership_using_docker - fix_ownership_using_docker() - sys.exit(1) - else: - get_console().print(f"\n[success]{success}[/]\n") - from airflow_breeze.utils.docker_command_utils import fix_ownership_using_docker - - fix_ownership_using_docker() - finally: - if not skip_cleanup: - for output in outputs: - Path(output.file_name).unlink(missing_ok=True) + fix_ownership_using_docker() @contextmanager diff --git a/dev/breeze/src/airflow_breeze/utils/publish_docs_to_s3.py b/dev/breeze/src/airflow_breeze/utils/publish_docs_to_s3.py index 7124bdab0d231..8951da8c1b2e4 100644 --- a/dev/breeze/src/airflow_breeze/utils/publish_docs_to_s3.py +++ b/dev/breeze/src/airflow_breeze/utils/publish_docs_to_s3.py @@ -218,7 +218,7 @@ def run_publish(self): check_async_run_results( results=results, - success="All docs published successfully", + success_message="All docs published successfully", outputs=outputs, include_success_outputs=False, ) diff --git a/dev/breeze/src/airflow_breeze/utils/run_utils.py b/dev/breeze/src/airflow_breeze/utils/run_utils.py index 805ca9b6f2ca1..7d903f4c29238 100644 --- a/dev/breeze/src/airflow_breeze/utils/run_utils.py +++ b/dev/breeze/src/airflow_breeze/utils/run_utils.py @@ -145,7 +145,7 @@ def shorten_command(_index: int, _argument: str) -> str: kwargs["stdout"] = subprocess.DEVNULL kwargs["stderr"] = subprocess.DEVNULL return subprocess.run(cmd, input=input, check=check, env=cmd_env, cwd=workdir, **kwargs) - with ci_group(title=f"Running command: {title}", message_type=None): + with ci_group(f"Running command: {title}"): get_console(output=output).print(f"\n[info]Working directory {workdir}\n") if input: get_console(output=output).print("[info]Input:")