From f7b5e0c8b44707f2f7b2895cfc4bf6203cd8498f Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Tue, 30 Sep 2025 01:01:42 +0100 Subject: [PATCH 01/10] Prefetch remote log connection id for api server in order to read remote logs --- .../workflows/additional-prod-image-tests.yml | 12 +- .github/workflows/airflow-e2e-tests.yml | 9 + .../airflow/utils/log/connection_manager.py | 93 +++++ .../airflow/utils/log/file_task_handler.py | 8 +- .../docker-compose-with-localstack.yaml | 388 ++++++++++++++++++ .../tests/airflow_e2e_tests/conftest.py | 9 +- .../tests/airflow_e2e_tests/constants.py | 15 +- .../e2e_test_utils/clients.py | 7 + .../remote_log_tests/__init__.py | 16 + .../remote_log_tests/test_remote_logging.py | 81 ++++ .../output_testing_airflow-e2e-tests.svg | 32 +- .../output_testing_airflow-e2e-tests.txt | 2 +- .../commands/testing_commands.py | 13 + .../commands/testing_commands_config.py | 1 + .../src/airflow_breeze/utils/run_tests.py | 4 +- 15 files changed, 663 insertions(+), 27 deletions(-) create mode 100644 airflow-core/src/airflow/utils/log/connection_manager.py create mode 100644 airflow-e2e-tests/docker/docker-compose-with-localstack.yaml create mode 100644 airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/__init__.py create mode 100644 airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py diff --git a/.github/workflows/additional-prod-image-tests.yml b/.github/workflows/additional-prod-image-tests.yml index a6d0f890d1117..9eba0fdf9dc28 100644 --- a/.github/workflows/additional-prod-image-tests.yml +++ b/.github/workflows/additional-prod-image-tests.yml @@ -192,7 +192,7 @@ jobs: - name: "Run Task SDK integration tests" run: breeze testing task-sdk-integration-tests - test-e2e-integration-tests: + test-e2e-integration-tests-basic: name: "Test e2e integration tests with PROD image" uses: ./.github/workflows/airflow-e2e-tests.yml with: @@ -200,3 +200,13 @@ jobs: platform: ${{ inputs.platform }} default-python-version: "${{ inputs.default-python-version }}" use-uv: ${{ inputs.use-uv }} + + test-e2e-integration-tests-remote-log: + name: "Test e2e integration tests with PROD image" + uses: ./.github/workflows/airflow-e2e-tests.yml + with: + runners: ${{ inputs.runners }} + platform: ${{ inputs.platform }} + default-python-version: "${{ inputs.default-python-version }}" + use-uv: ${{ inputs.use-uv }} + e2e_test_mode: "remote_log" diff --git a/.github/workflows/airflow-e2e-tests.yml b/.github/workflows/airflow-e2e-tests.yml index 31e6758fdaaae..73f031276d39a 100644 --- a/.github/workflows/airflow-e2e-tests.yml +++ b/.github/workflows/airflow-e2e-tests.yml @@ -44,6 +44,10 @@ on: # yamllint disable-line rule:truthy description: "Tag of the Docker image to test" type: string required: true + e2e_test_mode: + description: "Test mode - basic or remote_log" + type: string + default: "basic" workflow_call: inputs: @@ -67,6 +71,10 @@ on: # yamllint disable-line rule:truthy description: "Tag of the Docker image to test" type: string default: "" + e2e_test_mode: + description: "Test mode - quick or full" + type: string + default: "basic" jobs: test-e2e-integration-tests: @@ -101,6 +109,7 @@ jobs: run: breeze testing airflow-e2e-tests env: DOCKER_IMAGE: "${{ inputs.docker-image-tag }}" + E2E_TEST_MODE: "${{ inputs.e2e_test_mode }}" - name: "Upload e2e test report" uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 with: diff --git a/airflow-core/src/airflow/utils/log/connection_manager.py b/airflow-core/src/airflow/utils/log/connection_manager.py new file mode 100644 index 0000000000000..847763d0eee6f --- /dev/null +++ b/airflow-core/src/airflow/utils/log/connection_manager.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Connection management for server-side operations like remote log reading.""" + +from __future__ import annotations + +import contextlib +import logging +import os +from functools import lru_cache + +from airflow.configuration import conf + +log = logging.getLogger(__name__) + + +@lru_cache +def _get_remote_logging_connection_uri(conn_id: str) -> str | None: + """ + Fetch and cache connection URI for remote logging. + + Similar to task-sdk supervisor pattern, but uses airflow-core connection access. + """ + from airflow.models.connection import Connection + + try: + conn = Connection.get_connection_from_secrets(conn_id) + return conn.get_uri() + except Exception: + log.exception("Unable to retrieve remote logging connection %s", conn_id) + return None + + +def _get_remote_log_conn_id() -> str | None: + """Get the remote log connection ID from configuration.""" + return conf.get("logging", "remote_log_conn_id", fallback=None) + + +@contextlib.contextmanager +def with_remote_logging_connection(): + """ + Context manager to pre-fetch remote logging connection and set as environment variable. + + This follows the same pattern as task-sdk supervisor's _remote_logging_conn but uses + airflow-core's connection access. When remote log handlers try to get connections, + they'll find them in the environment variables instead of trying to use SUPERVISOR_COMMS. + + Usage: + with with_remote_logging_connection(): + # Remote log handlers will find connections in env vars + sources, logs = remote_io.read(path, ti) + """ + conn_id = _get_remote_log_conn_id() + if not conn_id: + # No remote logging connection configured + yield + return + + # Get connection URI using server-side access + conn_uri = _get_remote_logging_connection_uri(conn_id) + if not conn_uri: + log.warning("Could not fetch remote logging connection %s", conn_id) + yield + return + + env_key = f"AIRFLOW_CONN_{conn_id.upper()}" + old_value = os.getenv(env_key) + + try: + os.environ[env_key] = conn_uri + log.debug("Set remote logging connection %s in environment", conn_id) + yield + finally: + # Restore original environment state + if old_value is None: + if env_key in os.environ: + del os.environ[env_key] + else: + os.environ[env_key] = old_value diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 9954688a41f2b..9eb5a293e1c74 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -40,6 +40,7 @@ from airflow.configuration import conf from airflow.executors.executor_loader import ExecutorLoader from airflow.utils.helpers import parse_template_string, render_template +from airflow.utils.log.connection_manager import with_remote_logging_connection from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator from airflow.utils.log.logging_mixin import SetContextPropagate from airflow.utils.log.non_caching_file_handler import NonCachingRotatingFileHandler @@ -932,6 +933,7 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> LegacyLogResponse # This living here is not really a good plan, but it just about works for now. # Ideally we move all the read+combine logic in to TaskLogReader and out of the task handler. - path = self._render_filename(ti, try_number) - sources, logs = remote_io.read(path, ti) - return sources, logs or [] + with with_remote_logging_connection(): + path = self._render_filename(ti, try_number) + sources, logs = remote_io.read(path, ti) + return sources, logs or [] diff --git a/airflow-e2e-tests/docker/docker-compose-with-localstack.yaml b/airflow-e2e-tests/docker/docker-compose-with-localstack.yaml new file mode 100644 index 0000000000000..49bc2be1b626e --- /dev/null +++ b/airflow-e2e-tests/docker/docker-compose-with-localstack.yaml @@ -0,0 +1,388 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. +# +# WARNING: This configuration is for local development. Do not use it in a production deployment. +# +# This configuration supports basic configuration using environment variables or an .env file +# The following variables are supported: +# +# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. +# Default: apache/airflow:|version| +# AIRFLOW_UID - User ID in Airflow containers +# Default: 50000 +# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. +# Default: . +# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode +# +# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). +# Default: airflow +# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). +# Default: airflow +# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. +# Use this option ONLY for quick checks. Installing requirements at container +# startup is done EVERY TIME the service is started. +# A better way is to build a custom image or extend the official image +# as described in https://airflow.apache.org/docs/docker-stack/build.html. +# Default: '' +# +# Feel free to modify this file to suit your needs. +--- +x-airflow-common: + &airflow-common + # In order to add custom dependencies or upgrade provider distributions you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:|version|} + # build: . + environment: + &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__CORE__AUTH_MANAGER: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'true' + AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/' + # yamllint disable rule:line-length + # Use simple http server on scheduler for health checks + # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server + # yamllint enable rule:line-length + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks + # for other purpose (development, test and especially production usage) build/extend Airflow image. + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + AIRFLOW__LOGGING__REMOTE_LOGGING: 'true' + AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: 'aws_s3_logs' + AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: 's3://test-airflow-logs' + AIRFLOW__LOGGING__LOGGING_LEVEL: 'INFO' + # AWS configuration for LocalStack + AWS_ACCESS_KEY_ID: 'test' + AWS_SECRET_ACCESS_KEY: 'test' + AWS_DEFAULT_REGION: 'us-east-1' + AWS_ENDPOINT_URL_S3: 'http://localstack:4566' + # The following line can be used to set a custom config file, stored in the local config folder + AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg' + volumes: + - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config + - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins + user: "${AIRFLOW_UID:-50000}:0" + depends_on: + &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + localstack: + condition: service_healthy + +services: + postgres: + image: postgres:16 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + + redis: + # Redis is limited to 7.2-bookworm due to licencing change + # https://redis.io/blog/redis-adopts-dual-source-available-licensing/ + image: redis:7.2-bookworm + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 30s + retries: 50 + start_period: 30s + restart: always + + airflow-apiserver: + <<: *airflow-common + command: api-server + ports: + - "8080:8080" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-dag-processor: + <<: *airflow-common + command: dag-processor + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + # yamllint disable rule:line-length + test: + - "CMD-SHELL" + - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-apiserver: + condition: service_healthy + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + localstack: + container_name: "${LOCALSTACK_DOCKER_NAME:-localstack-main}" + image: localstack/localstack:4.7 + labels: + breeze.description: "Integration that emulates AWS services locally." + ports: + - "4566:4566" # LocalStack Gateway + - "4510-4559:4510-4559" # external services port range + environment: + - DEBUG=${DEBUG:-0} + - SERVICES=s3 + - AWS_DEFAULT_REGION=us-east-1 + - AWS_ACCESS_KEY_ID=test + - AWS_SECRET_ACCESS_KEY=test + volumes: + - "/var/run/docker.sock:/var/run/docker.sock" + - localstack-data:/var/lib/localstack + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:4566/_localstack/health"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 10s + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" + echo + export AIRFLOW_UID=$$(id -u) + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" + echo + fi + echo + echo "Creating missing opt dirs if missing:" + echo + mkdir -v -p /opt/airflow/{logs,dags,plugins,config} + echo + echo "Airflow version:" + /entrypoint airflow version + echo + echo "Files in shared volumes:" + echo + ls -la /opt/airflow/{logs,dags,plugins,config} + echo + echo "Running airflow config list to create default config file if missing." + echo + /entrypoint airflow config list >/dev/null + echo + echo "Files in shared volumes:" + echo + ls -la /opt/airflow/{logs,dags,plugins,config} + echo + echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0" + echo + chown -R "${AIRFLOW_UID}:0" /opt/airflow/ + echo + echo "Change ownership of files in shared volumes to ${AIRFLOW_UID}:0" + echo + chown -v -R "${AIRFLOW_UID}:0" /opt/airflow/{logs,dags,plugins,config} + echo + echo "Files in shared volumes:" + echo + ls -la /opt/airflow/{logs,dags,plugins,config} + echo "Waiting for LocalStack to be ready..." + + until curl -f http://localstack:4566/_localstack/health; do + echo "LocalStack not ready, waiting..." + sleep 2 + done + uv pip install awscli + aws --endpoint-url=http://localstack:4566 s3 mb s3://test-airflow-logs || echo "Bucket creation failed or already exists" + + echo "Creating Airflow S3 connection..." + /entrypoint airflow connections add aws_s3_logs \ + --conn-type aws \ + --conn-login test \ + --conn-password test \ + --conn-extra '{"endpoint_url": "http://localstack:4566", "region_name": "us-east-1"}' || echo "Connection creation failed or already exists" + + echo "Remote logging S3 setup completed" + + # yamllint enable rule:line-length + environment: + <<: *airflow-common-env + _AIRFLOW_DB_MIGRATE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' + user: "0:0" + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow + depends_on: + <<: *airflow-common-depends-on + + # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up + # or by explicitly targeted on the command line e.g. docker-compose up flower. + # See: https://docs.docker.com/compose/profiles/ + flower: + <<: *airflow-common + command: celery flower + profiles: + - flower + ports: + - "5555:5555" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:5555/"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + +volumes: + postgres-db-volume: + localstack-data: diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py index d63c17e87a909..ce8806a8e68f2 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py @@ -26,8 +26,8 @@ from testcontainers.compose import DockerCompose from airflow_e2e_tests.constants import ( - AIRFLOW_ROOT_PATH, DOCKER_COMPOSE_HOST_PORT, + DOCKER_COMPOSE_PATH, DOCKER_IMAGE, E2E_DAGS_FOLDER, LOGS_FOLDER, @@ -44,11 +44,8 @@ def spin_up_airflow_environment(tmp_path_factory): global airflow_logs_path tmp_dir = tmp_path_factory.mktemp("airflow-e2e-tests") - compose_file_path = ( - AIRFLOW_ROOT_PATH / "airflow-core" / "docs" / "howto" / "docker-compose" / "docker-compose.yaml" - ) - - copyfile(compose_file_path, tmp_dir / "docker-compose.yaml") + console.print(f"[yellow]Using docker compose file: {DOCKER_COMPOSE_PATH}") + copyfile(DOCKER_COMPOSE_PATH, tmp_dir / "docker-compose.yaml") subfolders = ("dags", "logs", "plugins", "config") diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py index 54225059e0fa6..c55a3b284ae62 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py @@ -27,7 +27,9 @@ DOCKER_IMAGE = os.environ.get("DOCKER_IMAGE") or DEFAULT_DOCKER_IMAGE os.environ["AIRFLOW_UID"] = str(os.getuid()) -DOCKER_COMPOSE_PATH = AIRFLOW_ROOT_PATH / "airflow-core" / "docs" / "howto" / "docker-compose" +DEFAULT_DOCKER_COMPOSE_PATH = ( + AIRFLOW_ROOT_PATH / "airflow-core" / "docs" / "howto" / "docker-compose" / "docker-compose.yaml" +) AIRFLOW_WWW_USER_USERNAME = os.environ.get("_AIRFLOW_WWW_USER_USERNAME", "airflow") AIRFLOW_WWW_USER_PASSWORD = os.environ.get("_AIRFLOW_WWW_USER_PASSWORD", "airflow") @@ -37,3 +39,14 @@ # The logs folder where the Airflow logs will be copied to and uploaded to github artifacts LOGS_FOLDER = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "logs" TEST_REPORT_FILE = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "_e2e_test_report.json" + +E2E_TEST_MODE = os.environ.get("E2E_TEST_MODE", "basic") + +DOCKER_COMPOSE_PATH = None + +if E2E_TEST_MODE == "basic": + DOCKER_COMPOSE_PATH = DEFAULT_DOCKER_COMPOSE_PATH +elif E2E_TEST_MODE == "remote_log": + DOCKER_COMPOSE_PATH = ( + AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "docker-compose-with-localstack.yaml" + ) diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py index 6d7757263475a..32abeb5331d78 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py @@ -121,6 +121,13 @@ def trigger_dag_and_wait(self, dag_id: str, json=None): run_id=resp["dag_run_id"], ) + def get_task_logs(self, dag_id: str, run_id: str, task_id: str, try_number: int = 1): + """Get task logs via API.""" + return self._make_request( + method="GET", + endpoint=f"dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/logs/{try_number}", + ) + class TaskSDKClient: """Client for interacting with the Task SDK API.""" diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/__init__.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py new file mode 100644 index 0000000000000..f8ab78f27a8f4 --- /dev/null +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime, timezone + +import boto3 +import pytest + +from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient + + +class TestRemoteLogging: + airflow_client = AirflowClient() + dag_id = "example_xcom_test" + + def test_dag_unpause(self): + self.airflow_client.un_pause_dag( + TestRemoteLogging.dag_id, + ) + + def test_remote_logging_s3(self): + """Test that a DAG using remote logging to S3 completes successfully.""" + + self.airflow_client.un_pause_dag(TestRemoteLogging.dag_id) + + resp = self.airflow_client.trigger_dag( + TestRemoteLogging.dag_id, json={"logical_date": datetime.now(timezone.utc).isoformat()} + ) + state = self.airflow_client.wait_for_dag_run( + dag_id=TestRemoteLogging.dag_id, + run_id=resp["dag_run_id"], + ) + + assert state == "success", ( + f"DAG {TestRemoteLogging.dag_id} did not complete successfully. Final state: {state}" + ) + + task_logs = self.airflow_client.get_task_logs( + dag_id=TestRemoteLogging.dag_id, + task_id="bash_pull", + run_id=resp["dag_run_id"], + ) + + task_log_sources = task_logs.get("content", [{}])[0].get("sources", []) + + s3_client = boto3.client( + "s3", + endpoint_url="http://localhost:4566", + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="us-east-1", + ) + + # This bucket will be created part of the docker-compose setup in + bucket_name = "test-airflow-logs" + response = s3_client.list_objects_v2(Bucket=bucket_name) + + if "Contents" not in response: + pytest.fail("No objects found in S3 bucket %s", bucket_name) + + # s3 key format: dag_id=example_xcom/run_id=manual__2025-09-29T23:32:09.457215+00:00/task_id=bash_pull/attempt=1.log + + log_files = [f"s3://{bucket_name}/{obj['Key']}" for obj in response["Contents"]] + assert any(source in log_files for source in task_log_sources), ( + f"None of the log sources {task_log_sources} were found in S3 bucket logs {log_files}" + ) diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg index 9abcda1d332f1..7fc70e474b999 100644 --- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg +++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg @@ -1,4 +1,4 @@ - +