diff --git a/.github/workflows/additional-prod-image-tests.yml b/.github/workflows/additional-prod-image-tests.yml index a6d0f890d1117..ec98cbb2d26e3 100644 --- a/.github/workflows/additional-prod-image-tests.yml +++ b/.github/workflows/additional-prod-image-tests.yml @@ -192,7 +192,7 @@ jobs: - name: "Run Task SDK integration tests" run: breeze testing task-sdk-integration-tests - test-e2e-integration-tests: + test-e2e-integration-tests-basic: name: "Test e2e integration tests with PROD image" uses: ./.github/workflows/airflow-e2e-tests.yml with: @@ -200,3 +200,13 @@ jobs: platform: ${{ inputs.platform }} default-python-version: "${{ inputs.default-python-version }}" use-uv: ${{ inputs.use-uv }} + + test-e2e-integration-tests-remote-log: + name: "Remote logging tests with PROD image" + uses: ./.github/workflows/airflow-e2e-tests.yml + with: + runners: ${{ inputs.runners }} + platform: ${{ inputs.platform }} + default-python-version: "${{ inputs.default-python-version }}" + use-uv: ${{ inputs.use-uv }} + e2e_test_mode: "remote_log" diff --git a/.github/workflows/airflow-e2e-tests.yml b/.github/workflows/airflow-e2e-tests.yml index 31e6758fdaaae..a540fba4d6f30 100644 --- a/.github/workflows/airflow-e2e-tests.yml +++ b/.github/workflows/airflow-e2e-tests.yml @@ -44,6 +44,10 @@ on: # yamllint disable-line rule:truthy description: "Tag of the Docker image to test" type: string required: true + e2e_test_mode: + description: "Test mode - basic or remote_log" + type: string + default: "basic" workflow_call: inputs: @@ -67,6 +71,10 @@ on: # yamllint disable-line rule:truthy description: "Tag of the Docker image to test" type: string default: "" + e2e_test_mode: + description: "Test mode - quick or full" + type: string + default: "basic" jobs: test-e2e-integration-tests: @@ -101,14 +109,7 @@ jobs: run: breeze testing airflow-e2e-tests env: DOCKER_IMAGE: "${{ inputs.docker-image-tag }}" - - name: "Upload e2e test report" - uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 - with: - name: e2e-test-report - path: './airflow-e2e-tests/_e2e_test_report.json' - retention-days: '7' - if-no-files-found: 'error' - if: always() + E2E_TEST_MODE: "${{ inputs.e2e_test_mode }}" - name: Zip logs run: | cd ./airflow-e2e-tests && zip -r logs.zip logs @@ -116,7 +117,7 @@ jobs: - name: "Upload logs" uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 with: - name: e2e-test-logs + name: "e2e-test-logs-${{ inputs.e2e_test_mode }}" path: './airflow-e2e-tests/logs.zip' retention-days: '7' if-no-files-found: 'error' diff --git a/airflow-core/docs/howto/docker-compose/docker-compose.yaml b/airflow-core/docs/howto/docker-compose/docker-compose.yaml index 2c2a614c9ef72..3892e04414358 100644 --- a/airflow-core/docs/howto/docker-compose/docker-compose.yaml +++ b/airflow-core/docs/howto/docker-compose/docker-compose.yaml @@ -51,6 +51,8 @@ x-airflow-common: # and uncomment the "build" line below, Then run `docker-compose build` to build the images. image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:|version|} # build: . + env_file: + - ${ENV_FILE_PATH:-.env} environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index 6d2caaa8a43a0..0b85a31a11784 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -72,6 +72,7 @@ from airflow.utils.types import DagRunType from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.db import clear_db_connections, clear_db_runs from tests_common.test_utils.file_task_handler import ( convert_list_to_stream, extract_events, @@ -86,6 +87,15 @@ FILE_TASK_HANDLER = "task" +@pytest.fixture(autouse=True) +def cleanup_tables(): + clear_db_runs() + clear_db_connections() + yield + clear_db_runs() + clear_db_connections() + + class TestFileTaskLogHandler: def clean_up(self): with create_session() as session: @@ -603,6 +613,72 @@ def test_set_context_trigger(self, create_dummy_dag, dag_maker, is_a_trigger, se actual = h.handler.baseFilename assert actual == os.fspath(tmp_path / expected) + @skip_if_force_lowest_dependencies_marker + def test_read_remote_logs_with_real_s3_remote_log_io(self, create_task_instance, session): + """Test _read_remote_logs method using real S3RemoteLogIO with mock AWS""" + import tempfile + + import boto3 + from moto import mock_aws + + from airflow.models.connection import Connection + from airflow.providers.amazon.aws.log.s3_task_handler import S3RemoteLogIO + + def setup_mock_aws(): + """Set up mock AWS S3 bucket and connection.""" + s3_client = boto3.client("s3", region_name="us-east-1") + s3_client.create_bucket(Bucket="test-airflow-logs") + return s3_client + + with mock_aws(): + aws_conn = Connection( + conn_id="aws_s3_conn", + conn_type="aws", + login="test_access_key", + password="test_secret_key", + extra='{"region_name": "us-east-1"}', + ) + session.add(aws_conn) + session.commit() + s3_client = setup_mock_aws() + + ti = create_task_instance( + dag_id="test_dag_s3_remote_logs", + task_id="test_task_s3_remote_logs", + run_type=DagRunType.SCHEDULED, + logical_date=DEFAULT_DATE, + ) + ti.try_number = 1 + + with tempfile.TemporaryDirectory() as temp_dir: + s3_remote_log_io = S3RemoteLogIO( + remote_base="s3://test-airflow-logs/logs", + base_log_folder=temp_dir, + delete_local_copy=False, + ) + + with conf_vars({("logging", "REMOTE_LOG_CONN_ID"): "aws_s3_conn"}): + fth = FileTaskHandler("") + log_relative_path = fth._render_filename(ti, 1) + + log_content = "Log line 1 from S3\nLog line 2 from S3\nLog line 3 from S3" + s3_client.put_object( + Bucket="test-airflow-logs", + Key=f"logs/{log_relative_path}", + Body=log_content.encode("utf-8"), + ) + + import airflow.logging_config + + airflow.logging_config.REMOTE_TASK_LOG = s3_remote_log_io + + sources, logs = fth._read_remote_logs(ti, try_number=1) + + assert len(sources) > 0, f"Expected sources but got: {sources}" + assert len(logs) > 0, f"Expected logs but got: {logs}" + assert logs[0] == log_content + assert f"s3://test-airflow-logs/logs/{log_relative_path}" in sources[0] + @pytest.mark.parametrize("logical_date", ((None), (DEFAULT_DATE))) class TestFilenameRendering: diff --git a/airflow-e2e-tests/docker/localstack.yml b/airflow-e2e-tests/docker/localstack.yml new file mode 100644 index 0000000000000..4a3f87d37d789 --- /dev/null +++ b/airflow-e2e-tests/docker/localstack.yml @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +services: + localstack: + container_name: "${LOCALSTACK_DOCKER_NAME:-localstack-main}" + image: localstack/localstack:4.7 + labels: + breeze.description: "Integration that emulates AWS services locally." + ports: + - "4566:4566" # LocalStack Gateway + - "4510-4559:4510-4559" # external services port range + environment: + # LocalStack configuration: https://docs.localstack.cloud/references/configuration/ + - DEBUG=${DEBUG:-0} + - SERVICES=s3 + - AWS_ACCESS_KEY_ID=test + - AWS_SECRET_ACCESS_KEY=test + - AWS_DEFAULT_REGION=us-east-1 + volumes: + - "./init-aws.sh:/etc/localstack/init/ready.d/init-aws.sh" + - "/var/run/docker.sock:/var/run/docker.sock" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:4566/_localstack/health"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 10s diff --git a/airflow-e2e-tests/pyproject.toml b/airflow-e2e-tests/pyproject.toml index 950b1545e9da5..67bac50b858f9 100644 --- a/airflow-e2e-tests/pyproject.toml +++ b/airflow-e2e-tests/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ "apache-airflow-devel-common", "python-on-whales>=0.70.0", "testcontainers>=4.12.0", + "boto3", ] [tool.pytest.ini_options] diff --git a/airflow-e2e-tests/scripts/init-aws.sh b/airflow-e2e-tests/scripts/init-aws.sh new file mode 100755 index 0000000000000..ca5a1cfe0783e --- /dev/null +++ b/airflow-e2e-tests/scripts/init-aws.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +aws --endpoint-url=http://localstack:4566 s3 mb s3://test-airflow-logs +aws --endpoint-url=http://localstack:4566 s3 ls diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py index d63c17e87a909..c639bffa80fe8 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py @@ -26,10 +26,13 @@ from testcontainers.compose import DockerCompose from airflow_e2e_tests.constants import ( - AIRFLOW_ROOT_PATH, + AWS_INIT_PATH, DOCKER_COMPOSE_HOST_PORT, + DOCKER_COMPOSE_PATH, DOCKER_IMAGE, E2E_DAGS_FOLDER, + E2E_TEST_MODE, + LOCALSTACK_PATH, LOGS_FOLDER, TEST_REPORT_FILE, ) @@ -39,16 +42,33 @@ airflow_logs_path = None +def _setup_s3_integration(dot_env_file, tmp_dir): + copyfile(LOCALSTACK_PATH, tmp_dir / "localstack.yml") + + copyfile(AWS_INIT_PATH, tmp_dir / "init-aws.sh") + current_permissions = os.stat(tmp_dir / "init-aws.sh").st_mode + os.chmod(tmp_dir / "init-aws.sh", current_permissions | 0o111) + + dot_env_file.write_text( + f"AIRFLOW_UID={os.getuid()}\n" + "AWS_DEFAULT_REGION=us-east-1\n" + "AWS_ENDPOINT_URL_S3=http://localstack:4566\n" + "AIRFLOW__LOGGING__REMOTE_LOGGING=true\n" + "AIRFLOW_CONN_AWS_S3_LOGS=aws://test:test@\n" + "AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID=aws_s3_logs\n" + "AIRFLOW__LOGGING__DELETE_LOCAL_LOGS=true\n" + "AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=s3://test-airflow-logs\n" + ) + os.environ["ENV_FILE_PATH"] = str(dot_env_file) + + def spin_up_airflow_environment(tmp_path_factory): global compose_instance global airflow_logs_path tmp_dir = tmp_path_factory.mktemp("airflow-e2e-tests") - compose_file_path = ( - AIRFLOW_ROOT_PATH / "airflow-core" / "docs" / "howto" / "docker-compose" / "docker-compose.yaml" - ) - - copyfile(compose_file_path, tmp_dir / "docker-compose.yaml") + console.print(f"[yellow]Using docker compose file: {DOCKER_COMPOSE_PATH}") + copyfile(DOCKER_COMPOSE_PATH, tmp_dir / "docker-compose.yaml") subfolders = ("dags", "logs", "plugins", "config") @@ -63,19 +83,23 @@ def spin_up_airflow_environment(tmp_path_factory): copytree(E2E_DAGS_FOLDER, tmp_dir / "dags", dirs_exist_ok=True) dot_env_file = tmp_dir / ".env" + dot_env_file.write_text(f"AIRFLOW_UID={os.getuid()}\n") console.print(f"[yellow]Creating .env file :[/ {dot_env_file}") - dot_env_file.write_text(f"AIRFLOW_UID={os.getuid()}\n") os.environ["AIRFLOW_IMAGE_NAME"] = DOCKER_IMAGE + compose_file_names = ["docker-compose.yaml"] + + if E2E_TEST_MODE == "remote_log": + compose_file_names.append("localstack.yml") + _setup_s3_integration(dot_env_file, tmp_dir) # If we are using the image from ghcr.io/apache/airflow/main we do not pull # as it is already available and loaded using prepare_breeze_and_image step in workflow pull = False if DOCKER_IMAGE.startswith("ghcr.io/apache/airflow/main/") else True console.print(f"[blue]Spinning up airflow environment using {DOCKER_IMAGE}") - - compose_instance = DockerCompose(tmp_dir, compose_file_name=["docker-compose.yaml"], pull=pull) + compose_instance = DockerCompose(tmp_dir, compose_file_name=compose_file_names, pull=pull) compose_instance.start() diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py index 54225059e0fa6..a208487da8b5d 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py @@ -27,7 +27,9 @@ DOCKER_IMAGE = os.environ.get("DOCKER_IMAGE") or DEFAULT_DOCKER_IMAGE os.environ["AIRFLOW_UID"] = str(os.getuid()) -DOCKER_COMPOSE_PATH = AIRFLOW_ROOT_PATH / "airflow-core" / "docs" / "howto" / "docker-compose" +DOCKER_COMPOSE_PATH = ( + AIRFLOW_ROOT_PATH / "airflow-core" / "docs" / "howto" / "docker-compose" / "docker-compose.yaml" +) AIRFLOW_WWW_USER_USERNAME = os.environ.get("_AIRFLOW_WWW_USER_USERNAME", "airflow") AIRFLOW_WWW_USER_PASSWORD = os.environ.get("_AIRFLOW_WWW_USER_PASSWORD", "airflow") @@ -37,3 +39,6 @@ # The logs folder where the Airflow logs will be copied to and uploaded to github artifacts LOGS_FOLDER = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "logs" TEST_REPORT_FILE = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "_e2e_test_report.json" +LOCALSTACK_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "localstack.yml" +E2E_TEST_MODE = os.environ.get("E2E_TEST_MODE", "basic") +AWS_INIT_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "scripts" / "init-aws.sh" diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py index 6d7757263475a..32abeb5331d78 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py @@ -121,6 +121,13 @@ def trigger_dag_and_wait(self, dag_id: str, json=None): run_id=resp["dag_run_id"], ) + def get_task_logs(self, dag_id: str, run_id: str, task_id: str, try_number: int = 1): + """Get task logs via API.""" + return self._make_request( + method="GET", + endpoint=f"dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/logs/{try_number}", + ) + class TaskSDKClient: """Client for interacting with the Task SDK API.""" diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/__init__.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py new file mode 100644 index 0000000000000..265bfca6bbe88 --- /dev/null +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime, timezone + +import boto3 +import pytest + +from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient + + +class TestRemoteLogging: + airflow_client = AirflowClient() + dag_id = "example_xcom_test" + + def test_dag_unpause(self): + self.airflow_client.un_pause_dag( + TestRemoteLogging.dag_id, + ) + + def test_remote_logging_s3(self): + """Test that a DAG using remote logging to S3 completes successfully.""" + + self.airflow_client.un_pause_dag(TestRemoteLogging.dag_id) + + resp = self.airflow_client.trigger_dag( + TestRemoteLogging.dag_id, json={"logical_date": datetime.now(timezone.utc).isoformat()} + ) + state = self.airflow_client.wait_for_dag_run( + dag_id=TestRemoteLogging.dag_id, + run_id=resp["dag_run_id"], + ) + + assert state == "success", ( + f"DAG {TestRemoteLogging.dag_id} did not complete successfully. Final state: {state}" + ) + + task_logs = self.airflow_client.get_task_logs( + dag_id=TestRemoteLogging.dag_id, + task_id="bash_pull", + run_id=resp["dag_run_id"], + ) + + task_log_sources = [ + source for content in task_logs.get("content", [{}]) for source in content.get("sources", []) + ] + + s3_client = boto3.client( + "s3", + endpoint_url="http://localhost:4566", + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="us-east-1", + ) + + # This bucket will be created part of the docker-compose setup in + bucket_name = "test-airflow-logs" + response = s3_client.list_objects_v2(Bucket=bucket_name) + + if "Contents" not in response: + pytest.fail("No objects found in S3 bucket %s", bucket_name) + + # s3 key format: dag_id=example_xcom/run_id=manual__2025-09-29T23:32:09.457215+00:00/task_id=bash_pull/attempt=1.log + + log_files = [f"s3://{bucket_name}/{obj['Key']}" for obj in response["Contents"]] + assert any(source in log_files for source in task_log_sources), ( + f"None of the log sources {task_log_sources} were found in S3 bucket logs {log_files}" + ) diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg index 9abcda1d332f1..7fc70e474b999 100644 --- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg +++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg @@ -1,4 +1,4 @@ - +