Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions airflow-core/docs/howto/docker-compose/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ services:
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
export AIRFLOW_UID=$(id -u)
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
Expand Down Expand Up @@ -251,9 +252,38 @@ services:
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /opts/airflow/{logs,dags,plugins,config}
chown -R "${AIRFLOW_UID}:0" /opts/airflow/{logs,dags,plugins,config}
exec /entrypoint airflow version
echo
echo "Creating missing opt dirs if missing:"
echo
mkdir -v -p /opt/airflow/{logs,dags,plugins,config}
echo
echo "Airflow version:"
/entrypoint airflow version
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
echo
echo "Running airflow config list to create default config file if missing."
echo
/entrypoint airflow config list >/dev/null
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
echo
echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0"
echo
chown -R "${AIRFLOW_UID}:0" /opt/airflow/
echo
echo "Change ownership of files in shared volumes to ${AIRFLOW_UID}:0"
echo
chown -v -R "${AIRFLOW_UID}:0" /opt/airflow/{logs,dags,plugins,config}
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}

# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
Expand Down
26 changes: 15 additions & 11 deletions dev/breeze/doc/images/output_testing_docker-compose-tests.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
73b209e41fec5642624d40a92fdeda71
3b806a5bfb9406969251bd457542e40a
3 changes: 3 additions & 0 deletions dev/breeze/src/airflow_breeze/commands/testing_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def group_for_testing():
is_flag=True,
)
@option_github_repository
@option_include_success_outputs
@option_verbose
@option_dry_run
@click.argument("extra_pytest_args", nargs=-1, type=click.Path(path_type=str))
Expand All @@ -137,6 +138,7 @@ def docker_compose_tests(
image_name: str,
skip_docker_compose_deletion: bool,
github_repository: str,
include_success_outputs: str,
extra_pytest_args: tuple,
):
"""Run docker-compose tests."""
Expand All @@ -147,6 +149,7 @@ def docker_compose_tests(
get_console().print(f"[info]Running docker-compose with PROD image: {image_name}[/]")
return_code, info = run_docker_compose_tests(
image_name=image_name,
include_success_outputs=include_success_outputs,
extra_pytest_args=extra_pytest_args,
skip_docker_compose_deletion=skip_docker_compose_deletion,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@
"--image-name",
"--python",
"--skip-docker-compose-deletion",
"--include-success-outputs",
"--github-repository",
],
}
Expand Down
6 changes: 5 additions & 1 deletion dev/breeze/src/airflow_breeze/utils/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def run_docker_compose_tests(
image_name: str,
extra_pytest_args: tuple,
skip_docker_compose_deletion: bool,
include_success_outputs: bool,
) -> tuple[int, str]:
command_result = run_command(["docker", "inspect", image_name], check=False, stdout=DEVNULL)
if command_result.returncode != 0:
Expand All @@ -106,8 +107,11 @@ def run_docker_compose_tests(
env["DOCKER_IMAGE"] = image_name
if skip_docker_compose_deletion:
env["SKIP_DOCKER_COMPOSE_DELETION"] = "true"
if include_success_outputs:
env["INCLUDE_SUCCESS_OUTPUTS"] = "true"
# since we are only running one test, we can print output directly with pytest -s
command_result = run_command(
["uv", "run", "pytest", str(test_path), *pytest_args, *extra_pytest_args],
["uv", "run", "pytest", str(test_path), "-s", *pytest_args, *extra_pytest_args],
env=env,
check=False,
cwd=DOCKER_TESTS_ROOT_PATH.as_posix(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import json
import os
import shlex
from pprint import pprint
from shutil import copyfile
from time import sleep

import pytest
import requests
from python_on_whales import DockerClient, docker
from python_on_whales.exceptions import DockerException
from rich.console import Console

# isort:off (needed to workaround isort bug)
from docker_tests.command_utils import run_command
Expand All @@ -36,6 +36,8 @@

# isort:on (needed to workaround isort bug)

console = Console(width=400, color_system="standard")

DOCKER_COMPOSE_HOST_PORT = os.environ.get("HOST_PORT", "localhost:8080")
AIRFLOW_WWW_USER_USERNAME = os.environ.get("_AIRFLOW_WWW_USER_USERNAME", "airflow")
AIRFLOW_WWW_USER_PASSWORD = os.environ.get("_AIRFLOW_WWW_USER_PASSWORD", "airflow")
Expand All @@ -60,13 +62,13 @@ def api_request(


def wait_for_terminal_dag_state(dag_id, dag_run_id):
print(f" Simplified representation of DAG {dag_id} ".center(72, "="))
pprint(api_request("GET", f"dags/{DAG_ID}/details"))
console.print(f"[bright_blue]Simplified representation of DAG {dag_id} ".center(72, "="))
console.print(api_request("GET", f"dags/{DAG_ID}/details"))

# Wait 400 seconds
for _ in range(400):
dag_state = api_request("GET", f"dags/{dag_id}/dagRuns/{dag_run_id}").get("state")
print(f"Waiting for DAG Run: dag_state={dag_state}")
console.print(f"Waiting for DAG Run: dag_state={dag_state}")
sleep(1)
if dag_state in ("success", "failed"):
break
Expand All @@ -76,20 +78,24 @@ def test_trigger_dag_and_wait_for_result(default_docker_image, tmp_path_factory,
"""Simple test which reproduce setup docker-compose environment and trigger example dag."""
tmp_dir = tmp_path_factory.mktemp("airflow-quick-start")
monkeypatch.setenv("AIRFLOW_IMAGE_NAME", default_docker_image)
console.print(f"[yellow]Tests are run in {tmp_dir}")

compose_file_path = (
AIRFLOW_ROOT_PATH / "airflow-core" / "docs" / "howto" / "docker-compose" / "docker-compose.yaml"
)
copyfile(compose_file_path, tmp_dir / "docker-compose.yaml")

subfolders = ("dags", "logs", "plugins", "config")
console.print(f"[yellow]Cleaning subfolders:[/ {subfolders}")
# Create required directories for docker compose quick start howto
for subdir in ("dags", "logs", "plugins"):
for subdir in ("dags", "logs", "plugins", "config"):
(tmp_dir / subdir).mkdir()

dot_env_file = tmp_dir / ".env"
console.print(f"[yellow]Creating .env file :[/ {dot_env_file}")
dot_env_file.write_text(f"AIRFLOW_UID={os.getuid()}\n")
print(" .env file content ".center(72, "="))
print(dot_env_file.read_text())
console.print(" .env file content ".center(72, "="))
console.print(dot_env_file.read_text())

compose_version = None
try:
Expand All @@ -101,10 +107,14 @@ def test_trigger_dag_and_wait_for_result(default_docker_image, tmp_path_factory,
except NotImplementedError:
docker_version = run_command(["docker", "version"], return_output=True)

console.print("[yellow] Shutting down previous instances of quick-start docker compose")
compose = DockerClient(compose_project_name="quick-start", compose_project_directory=tmp_dir).compose
compose.down(remove_orphans=True, volumes=True, quiet=True)
try:
console.print("[yellow] Starting docker compose")
compose.up(detach=True, wait=True, color=not os.environ.get("NO_COLOR"))
console.print("[green] Docker compose started")

# Before we proceed, let's make sure our DAG has been parsed
compose.execute(service="airflow-dag-processor", command=["airflow", "dags", "reserialize"])

Expand All @@ -118,36 +128,53 @@ def test_trigger_dag_and_wait_for_result(default_docker_image, tmp_path_factory,
wait_for_terminal_dag_state(dag_id=DAG_ID, dag_run_id=DAG_RUN_ID)
dag_state = api_request("GET", f"dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}").get("state")
assert dag_state == "success"
if os.environ.get("INCLUDE_SUCCESS_OUTPUTS", "") == "true":
print_diagnostics(compose, compose_version, docker_version)
except Exception:
print("HTTP: GET health")
pprint(api_request("GET", "monitor/health"))
print(f"HTTP: GET dags/{DAG_ID}/dagRuns")
pprint(api_request("GET", f"dags/{DAG_ID}/dagRuns"))
print(f"HTTP: GET dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}/taskInstances")
pprint(api_request("GET", f"dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}/taskInstances"))
print(" Docker Version ".center(72, "="))
print(docker_version)
print(" Docker Compose Version ".center(72, "="))
print(compose_version)
print(" Compose Config ".center(72, "="))
print(json.dumps(compose.config(return_json=True), indent=4))

for service in compose.ps(all=True):
print(f" Service: {service.name} ".center(72, "-"))
print(" Service State ".center(72, "."))
pprint(service.state)
print(" Service Config ".center(72, "."))
pprint(service.config)
print(" Service Logs ".center(72, "."))
print(service.logs())
print_diagnostics(compose, compose_version, docker_version)
raise
finally:
if not os.environ.get("SKIP_DOCKER_COMPOSE_DELETION"):
console.print(
"[yellow] Deleting docker compose instance (you can avoid that by passing "
"--skip-docker-compose-deletion flag in `breeze testing docker-compose` or "
'by setting SKIP_DOCKER_COMPOSE_DELETION environment variable to "true")'
)
compose.down(remove_orphans=True, volumes=True, quiet=True)
print("Docker compose instance deleted")
console.print("[green]Docker compose instance deleted")
else:
print("Skipping docker-compose deletion")
print()
print("You can run inspect your docker-compose by running commands starting with:")
console.print("[yellow]Skipping docker-compose deletion")
console.print()
console.print(
"[yellow]You can run inspect your docker-compose by running commands starting with:"
)
console.print()
quoted_command = map(shlex.quote, map(str, compose.docker_compose_cmd))
print(" ".join(quoted_command))
console.print(" ".join(quoted_command))


def print_diagnostics(compose: DockerClient, compose_version: str, docker_version: str):
console.print("HTTP: GET health")
try:
console.print(api_request("GET", "monitor/health"))
console.print(f"HTTP: GET dags/{DAG_ID}/dagRuns")
console.print(api_request("GET", f"dags/{DAG_ID}/dagRuns"))
console.print(f"HTTP: GET dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}/taskInstances")
console.print(api_request("GET", f"dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}/taskInstances"))
except Exception as e:
console.print(f"Failed to get health check: {e}")
console.print(" Docker Version ".center(72, "="))
console.print(docker_version)
console.print(" Docker Compose Version ".center(72, "="))
console.print(compose_version)
console.print(" Compose Config ".center(72, "="))
console.print(json.dumps(compose.config(return_json=True), indent=4))
for service in compose.ps(all=True):
console.print(f"Service: {service.name} ".center(72, "="))
console.print(f" Service State {service.name}".center(50, "."))
console.print(service.state)
console.print(f" Service Config {service.name}".center(50, "."))
console.print(service.config)
console.print(f" Service Logs {service.name}".center(50, "."))
console.print(service.logs())
console.print(f"End of service: {service.name} ".center(72, "="))
Loading