Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ jobs:
AIRFLOW_MONITOR_DELAY_TIME_IN_SECONDS: "${{inputs.monitor-delay-time-in-seconds}}"
VERBOSE: "true"
DEFAULT_BRANCH: "${{ inputs.default-branch }}"
TOTAL_TEST_TIMEOUT: "3600" # 60 minutes in seconds
if: inputs.test-group == 'core' || inputs.skip-providers-tests != 'true'
steps:
- name: "Cleanup repo"
Expand Down
4 changes: 2 additions & 2 deletions dev/breeze/src/airflow_breeze/commands/ci_image_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def run_build_in_parallel(
]
check_async_run_results(
results=results,
success="All images built correctly",
success_message="All images built correctly",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down Expand Up @@ -505,7 +505,7 @@ def run_verify_in_parallel(
]
check_async_run_results(
results=results,
success="All images verified",
success_message="All images verified",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down
14 changes: 7 additions & 7 deletions dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def create_cluster(
]
check_async_run_results(
results=results,
success="All clusters created.",
success_message="All clusters created.",
outputs=outputs,
skip_cleanup=skip_cleanup,
include_success_outputs=include_success_outputs,
Expand Down Expand Up @@ -699,7 +699,7 @@ def build_k8s_image(
]
check_async_run_results(
results=results,
success="All K8S images built correctly.",
success_message="All K8S images built correctly.",
outputs=outputs,
skip_cleanup=skip_cleanup,
include_success_outputs=include_success_outputs,
Expand Down Expand Up @@ -776,7 +776,7 @@ def upload_k8s_image(
]
check_async_run_results(
results=results,
success="All K8S images uploaded correctly.",
success_message="All K8S images uploaded correctly.",
outputs=outputs,
skip_cleanup=skip_cleanup,
include_success_outputs=include_success_outputs,
Expand Down Expand Up @@ -961,7 +961,7 @@ def configure_cluster(
]
check_async_run_results(
results=results,
success="All clusters configured correctly.",
success_message="All clusters configured correctly.",
outputs=outputs,
skip_cleanup=skip_cleanup,
include_success_outputs=include_success_outputs,
Expand Down Expand Up @@ -1228,7 +1228,7 @@ def deploy_airflow(
]
check_async_run_results(
results=results,
success="All Airflow charts successfully deployed.",
success_message="All Airflow charts successfully deployed.",
outputs=outputs,
skip_cleanup=skip_cleanup,
include_success_outputs=include_success_outputs,
Expand Down Expand Up @@ -1570,7 +1570,7 @@ def kubernetes_tests_command(
]
check_async_run_results(
results=results,
success="All K8S tests successfully completed.",
success_message="All K8S tests successfully completed.",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down Expand Up @@ -1822,7 +1822,7 @@ def run_complete_tests(
]
check_async_run_results(
results=results,
success="All K8S tests successfully completed.",
success_message="All K8S tests successfully completed.",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def run_build_in_parallel(
]
check_async_run_results(
results=results,
success="All images built correctly",
success_message="All images built correctly",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down Expand Up @@ -527,7 +527,7 @@ def run_verify_in_parallel(
]
check_async_run_results(
results=results,
success="All images verified",
success_message="All images verified",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ def run_generate_constraints_in_parallel(
]
check_async_run_results(
results=results,
success="All constraints are generated.",
success_message="All constraints are generated.",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down Expand Up @@ -1578,7 +1578,7 @@ def install_provider_distributions(
]
check_async_run_results(
results=results,
success="All packages installed successfully",
success_message="All packages installed successfully",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down
6 changes: 3 additions & 3 deletions dev/breeze/src/airflow_breeze/commands/sbom_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def _dir_exists_warn_and_should_skip(dir: Path, force: bool) -> bool:
]
check_async_run_results(
results=results,
success="All SBOMs were generated successfully",
success_message="All SBOMs were generated successfully",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down Expand Up @@ -601,7 +601,7 @@ def build_all_airflow_images(
]
check_async_run_results(
results=results,
success="All airflow base images were built successfully",
success_message="All airflow base images were built successfully",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down Expand Up @@ -750,7 +750,7 @@ def generate_providers_requirements(
]
check_async_run_results(
results=results,
success="Providers requirements were generated successfully",
success_message="Providers requirements were generated successfully",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down
118 changes: 80 additions & 38 deletions dev/breeze/src/airflow_breeze/commands/testing_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import os
import signal
import sys
from collections.abc import Generator
from datetime import datetime
from multiprocessing.pool import Pool
from time import sleep

import click
Expand Down Expand Up @@ -270,7 +272,10 @@ def _run_test(
def _get_project_names(shell_params: ShellParams) -> tuple[str, str]:
"""Return compose project name and project name."""
project_name = file_name_from_test_type(shell_params.test_type)
compose_project_name = f"airflow-test-{project_name}"
if shell_params.test_type == ALL_TEST_TYPE:
compose_project_name = "airflow-test"
else:
compose_project_name = f"airflow-test-{project_name}"
return compose_project_name, project_name


Expand All @@ -283,16 +288,18 @@ def _dump_container_logs(output: Output | None, shell_params: ShellParams):
text=True,
)
container_ids = ps_result.stdout.splitlines()
get_console(output=output).print("[info]Wait 10 seconds for logs to find their way to stderr.\n")
get_console(output=output).print("[warning]Wait 10 seconds for logs to find their way to stderr.\n")
sleep(10)
compose_project_name, project_name = _get_project_names(shell_params)
get_console(output=output).print(f"[info]Dumping containers: {container_ids} for {project_name}.\n")
get_console(output=output).print(
f"[warning]Dumping container logs: {container_ids} for compose project {compose_project_name} (cp.\n"
)
date_str = datetime.now().strftime("%Y_%d_%m_%H_%M_%S")
for container_id in container_ids:
if compose_project_name not in container_id:
continue
dump_path = FILES_PATH / f"container_logs_{container_id}_{date_str}.log"
get_console(output=output).print(f"[info]Dumping container {container_id} to {dump_path}\n")
get_console(output=output).print(f"[info]Dumping container log {container_id} to {dump_path}\n")
with open(dump_path, "w") as outfile:
run_command(
["docker", "logs", "--details", "--timestamps", container_id],
Expand All @@ -312,6 +319,7 @@ def _run_tests_in_pool(
skip_docker_compose_down: bool,
test_timeout: int,
tests_to_run: list[str],
handler: TimeoutHandler,
):
if not tests_to_run:
return
Expand Down Expand Up @@ -349,6 +357,7 @@ def _run_tests_in_pool(
lines_to_search=400,
),
) as (pool, outputs):
handler.set_pool(pool)
results = [
pool.apply_async(
_run_test,
Expand All @@ -366,12 +375,13 @@ def _run_tests_in_pool(
escaped_tests = [test.replace("[", "\\[") for test in tests_to_run]
check_async_run_results(
results=results,
success=f"Tests {' '.join(escaped_tests)} completed successfully",
success_message=f"Tests {' '.join(escaped_tests)} completed successfully",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
summarize_on_ci=SummarizeAfter.FAILURE,
summary_start_regexp=r".*= FAILURES.*|.*= ERRORS.*",
terminated_on_timeout=handler.terminated_on_timeout_output_list[0],
)


Expand All @@ -395,6 +405,7 @@ def run_tests_in_parallel(
parallelism: int,
skip_cleanup: bool,
skip_docker_compose_down: bool,
handler: TimeoutHandler,
) -> None:
get_console().print("\n[info]Summary of the tests to run\n")
get_console().print(f"[info]Running tests in parallel with parallelism={parallelism}")
Expand All @@ -417,6 +428,7 @@ def run_tests_in_parallel(
debug_resources=debug_resources,
skip_cleanup=skip_cleanup,
skip_docker_compose_down=skip_docker_compose_down,
handler=handler,
)


Expand Down Expand Up @@ -1255,43 +1267,29 @@ def python_api_client_tests(
sys.exit(returncode)


@contextlib.contextmanager
def run_with_timeout(timeout: int, shell_params: ShellParams):
def timeout_handler(signum, frame):
get_console().print("[warning]Timeout reached. Killing the container(s)[/]:")
_print_all_containers()
if os.environ.get("CI") == "true":
get_console().print("[warning]Dumping container logs first[/]:")
_dump_container_logs(output=None, shell_params=shell_params)
list_of_containers = _get_running_containers().stdout.splitlines()
get_console().print("[warning]Attempting to send TERM signal to all remaining containers:")
get_console().print(list_of_containers)
_send_signal_to_containers(list_of_containers, "SIGTERM")
get_console().print(f"[warning]Waiting {GRACE_CONTAINER_STOP_TIMEOUT} seconds for containers to stop")
sleep(GRACE_CONTAINER_STOP_TIMEOUT)
containers_left = _get_running_containers().stdout.splitlines()
if containers_left:
get_console().print("[warning]Some containers are still running. Killing them with SIGKILL:")
get_console().print(containers_left)
_send_signal_to_containers(list_of_containers, "SIGKILL")
get_console().print(
f"[warning]Waiting {GRACE_CONTAINER_STOP_TIMEOUT} seconds for containers to stop"
)
sleep(GRACE_CONTAINER_STOP_TIMEOUT)
containers_left = _get_running_containers().stdout.splitlines()
if containers_left:
get_console().print("[error]Some containers are still running. Exiting anyway.")
get_console().print(containers_left)
sys.exit(1)
class TimeoutHandler:
def __init__(self, shell_params: ShellParams, terminated_on_timeout_output_list: list[bool]):
# Initialize the timeout handler with shell parameters and a list to track terminated outputs
# The terminated_on_timeout_output_list list is used to signal to the outside world that the
# output has been terminate by setting the first element to True when the timeout is reached.
self.shell_params = shell_params
self.pool: Pool | None = None
self.terminated_on_timeout_output_list = terminated_on_timeout_output_list
self.terminated_on_timeout_output_list[0] = False

def _send_signal_to_containers(list_of_containers: list[str], signal: str):
def set_pool(self, pool: Pool):
self.pool = pool

@staticmethod
def _send_signal_to_containers(list_of_containers: list[str], signal_number: str):
run_command(
["docker", "kill", "--signal", signal, *list_of_containers],
["docker", "kill", "--signal", signal_number, *list_of_containers],
check=True,
capture_output=False,
text=True,
)

@staticmethod
def _get_running_containers() -> RunCommandResult:
return run_command(
["docker", "ps", "-q"],
Expand All @@ -1300,16 +1298,59 @@ def _get_running_containers() -> RunCommandResult:
text=True,
)

@staticmethod
def _print_all_containers():
run_command(
["docker", "ps"],
check=True,
)

signal.signal(signal.SIGALRM, timeout_handler)
def timeout_method(self, signum, frame):
get_console().print("[warning]Timeout reached.[/]")
if self.pool:
get_console().print("[warning]Terminating the pool[/]")
self.pool.close()
self.pool.terminate()
# No join here. The pool is joined already in the main function
get_console().print("[warning]Stopping all running containers[/]:")
self._print_all_containers()
if os.environ.get("CI") == "true":
get_console().print("[warning]Dumping container logs first[/]")
_dump_container_logs(output=None, shell_params=self.shell_params)
list_of_containers = self._get_running_containers().stdout.splitlines()
get_console().print("[warning]Attempting to send TERM signal to all remaining containers:")
get_console().print(list_of_containers)
self._send_signal_to_containers(list_of_containers, "SIGTERM")
get_console().print(f"[warning]Waiting {GRACE_CONTAINER_STOP_TIMEOUT} seconds for containers to stop")
sleep(GRACE_CONTAINER_STOP_TIMEOUT)
containers_left = self._get_running_containers().stdout.splitlines()
if containers_left:
get_console().print("[warning]Some containers are still running. Killing them with SIGKILL:")
get_console().print(containers_left)
self._send_signal_to_containers(list_of_containers, "SIGKILL")
get_console().print(
f"[warning]Waiting {GRACE_CONTAINER_STOP_TIMEOUT} seconds for containers to stop"
)
sleep(GRACE_CONTAINER_STOP_TIMEOUT)
containers_left = self._get_running_containers().stdout.splitlines()
if containers_left:
get_console().print(
"[error]Some containers are still running. Marking stuff as terminated anyway."
)
get_console().print(containers_left)
get_console().print(
"[warning]All containers stopped. Marking the whole run as terminated on timeout[/]"
)
self.terminated_on_timeout_output_list[0] = True


@contextlib.contextmanager
def run_with_timeout(timeout: int, shell_params: ShellParams) -> Generator[TimeoutHandler, None, None]:
timeout_handler = TimeoutHandler(shell_params=shell_params, terminated_on_timeout_output_list=[False])
signal.signal(signal.SIGALRM, lambda signum, frame: timeout_handler.timeout_method(signum, frame))
signal.alarm(timeout)
try:
yield
yield timeout_handler
finally:
signal.alarm(0)

Expand Down Expand Up @@ -1429,7 +1470,7 @@ def _run_test_command(
f"Your test type = {test_type}\n"
)
sys.exit(1)
with run_with_timeout(total_test_timeout, shell_params=shell_params):
with run_with_timeout(total_test_timeout, shell_params=shell_params) as handler:
run_tests_in_parallel(
shell_params=shell_params,
extra_pytest_args=extra_pytest_args,
Expand All @@ -1439,6 +1480,7 @@ def _run_test_command(
skip_cleanup=skip_cleanup,
debug_resources=debug_resources,
skip_docker_compose_down=skip_docker_compose_down,
handler=handler,
)
else:
if shell_params.test_type == ALL_TEST_TYPE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
def debug_pyproject_tomls(pyproject_toml_paths: list[Path]) -> None:
if get_verbose() or get_dry_run():
for pyproject_toml_path in pyproject_toml_paths:
with ci_group(f"Updated {pyproject_toml_path} content", message_type=MessageType.INFO):
with ci_group(f"Updated {pyproject_toml_path} content", MessageType.INFO):
# Format the content to make it more readable with rich
syntax = Syntax(pyproject_toml_path.read_text(), "toml", theme="ansi_dark", line_numbers=True)
get_console().print(syntax)
2 changes: 1 addition & 1 deletion dev/breeze/src/airflow_breeze/utils/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def get_kwds(index: int, image_param: BuildCiParams | BuildProdParams):
]
check_async_run_results(
results=results,
success="All images pulled",
success_message="All images pulled",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
Expand Down
Loading