Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .github/workflows/additional-prod-image-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,21 @@ jobs:
- name: "Run Task SDK integration tests"
run: breeze testing task-sdk-integration-tests

test-e2e-integration-tests:
test-e2e-integration-tests-basic:
name: "Test e2e integration tests with PROD image"
uses: ./.github/workflows/airflow-e2e-tests.yml
with:
runners: ${{ inputs.runners }}
platform: ${{ inputs.platform }}
default-python-version: "${{ inputs.default-python-version }}"
use-uv: ${{ inputs.use-uv }}

test-e2e-integration-tests-remote-log:
name: "Remote logging tests with PROD image"
uses: ./.github/workflows/airflow-e2e-tests.yml
with:
runners: ${{ inputs.runners }}
platform: ${{ inputs.platform }}
default-python-version: "${{ inputs.default-python-version }}"
use-uv: ${{ inputs.use-uv }}
e2e_test_mode: "remote_log"
19 changes: 10 additions & 9 deletions .github/workflows/airflow-e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ on: # yamllint disable-line rule:truthy
description: "Tag of the Docker image to test"
type: string
required: true
e2e_test_mode:
description: "Test mode - basic or remote_log"
type: string
default: "basic"

workflow_call:
inputs:
Expand All @@ -67,6 +71,10 @@ on: # yamllint disable-line rule:truthy
description: "Tag of the Docker image to test"
type: string
default: ""
e2e_test_mode:
description: "Test mode - quick or full"
type: string
default: "basic"

jobs:
test-e2e-integration-tests:
Expand Down Expand Up @@ -101,22 +109,15 @@ jobs:
run: breeze testing airflow-e2e-tests
env:
DOCKER_IMAGE: "${{ inputs.docker-image-tag }}"
- name: "Upload e2e test report"
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: e2e-test-report
path: './airflow-e2e-tests/_e2e_test_report.json'
retention-days: '7'
if-no-files-found: 'error'
if: always()
E2E_TEST_MODE: "${{ inputs.e2e_test_mode }}"
- name: Zip logs
run: |
cd ./airflow-e2e-tests && zip -r logs.zip logs
if: always()
- name: "Upload logs"
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: e2e-test-logs
name: "e2e-test-logs-${{ inputs.e2e_test_mode }}"
path: './airflow-e2e-tests/logs.zip'
retention-days: '7'
if-no-files-found: 'error'
Expand Down
2 changes: 2 additions & 0 deletions airflow-core/docs/howto/docker-compose/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ x-airflow-common:
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:|version|}
# build: .
env_file:
- ${ENV_FILE_PATH:-.env}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
Expand Down
76 changes: 76 additions & 0 deletions airflow-core/tests/unit/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
from airflow.utils.types import DagRunType

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_connections, clear_db_runs
from tests_common.test_utils.file_task_handler import (
convert_list_to_stream,
extract_events,
Expand All @@ -86,6 +87,15 @@
FILE_TASK_HANDLER = "task"


@pytest.fixture(autouse=True)
def cleanup_tables():
clear_db_runs()
clear_db_connections()
yield
clear_db_runs()
clear_db_connections()


class TestFileTaskLogHandler:
def clean_up(self):
with create_session() as session:
Expand Down Expand Up @@ -603,6 +613,72 @@ def test_set_context_trigger(self, create_dummy_dag, dag_maker, is_a_trigger, se
actual = h.handler.baseFilename
assert actual == os.fspath(tmp_path / expected)

@skip_if_force_lowest_dependencies_marker
def test_read_remote_logs_with_real_s3_remote_log_io(self, create_task_instance, session):
"""Test _read_remote_logs method using real S3RemoteLogIO with mock AWS"""
import tempfile

import boto3
from moto import mock_aws

from airflow.models.connection import Connection
from airflow.providers.amazon.aws.log.s3_task_handler import S3RemoteLogIO

def setup_mock_aws():
"""Set up mock AWS S3 bucket and connection."""
s3_client = boto3.client("s3", region_name="us-east-1")
s3_client.create_bucket(Bucket="test-airflow-logs")
return s3_client

with mock_aws():
aws_conn = Connection(
conn_id="aws_s3_conn",
conn_type="aws",
login="test_access_key",
password="test_secret_key",
extra='{"region_name": "us-east-1"}',
)
session.add(aws_conn)
session.commit()
s3_client = setup_mock_aws()

ti = create_task_instance(
dag_id="test_dag_s3_remote_logs",
task_id="test_task_s3_remote_logs",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
)
ti.try_number = 1

with tempfile.TemporaryDirectory() as temp_dir:
s3_remote_log_io = S3RemoteLogIO(
remote_base="s3://test-airflow-logs/logs",
base_log_folder=temp_dir,
delete_local_copy=False,
)

with conf_vars({("logging", "REMOTE_LOG_CONN_ID"): "aws_s3_conn"}):
fth = FileTaskHandler("")
log_relative_path = fth._render_filename(ti, 1)

log_content = "Log line 1 from S3\nLog line 2 from S3\nLog line 3 from S3"
s3_client.put_object(
Bucket="test-airflow-logs",
Key=f"logs/{log_relative_path}",
Body=log_content.encode("utf-8"),
)

import airflow.logging_config

airflow.logging_config.REMOTE_TASK_LOG = s3_remote_log_io

sources, logs = fth._read_remote_logs(ti, try_number=1)

assert len(sources) > 0, f"Expected sources but got: {sources}"
assert len(logs) > 0, f"Expected logs but got: {logs}"
assert logs[0] == log_content
assert f"s3://test-airflow-logs/logs/{log_relative_path}" in sources[0]


@pytest.mark.parametrize("logical_date", ((None), (DEFAULT_DATE)))
class TestFilenameRendering:
Expand Down
42 changes: 42 additions & 0 deletions airflow-e2e-tests/docker/localstack.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
---
services:
localstack:
container_name: "${LOCALSTACK_DOCKER_NAME:-localstack-main}"
image: localstack/localstack:4.7
labels:
breeze.description: "Integration that emulates AWS services locally."
ports:
- "4566:4566" # LocalStack Gateway
- "4510-4559:4510-4559" # external services port range
environment:
# LocalStack configuration: https://docs.localstack.cloud/references/configuration/
- DEBUG=${DEBUG:-0}
- SERVICES=s3
- AWS_ACCESS_KEY_ID=test
- AWS_SECRET_ACCESS_KEY=test
- AWS_DEFAULT_REGION=us-east-1
volumes:
- "./init-aws.sh:/etc/localstack/init/ready.d/init-aws.sh"
- "/var/run/docker.sock:/var/run/docker.sock"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:4566/_localstack/health"]
interval: 10s
timeout: 5s
retries: 10
start_period: 10s
1 change: 1 addition & 0 deletions airflow-e2e-tests/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"apache-airflow-devel-common",
"python-on-whales>=0.70.0",
"testcontainers>=4.12.0",
"boto3",
]

[tool.pytest.ini_options]
Expand Down
20 changes: 20 additions & 0 deletions airflow-e2e-tests/scripts/init-aws.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

aws --endpoint-url=http://localstack:4566 s3 mb s3://test-airflow-logs
aws --endpoint-url=http://localstack:4566 s3 ls
42 changes: 33 additions & 9 deletions airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
from testcontainers.compose import DockerCompose

from airflow_e2e_tests.constants import (
AIRFLOW_ROOT_PATH,
AWS_INIT_PATH,
DOCKER_COMPOSE_HOST_PORT,
DOCKER_COMPOSE_PATH,
DOCKER_IMAGE,
E2E_DAGS_FOLDER,
E2E_TEST_MODE,
LOCALSTACK_PATH,
LOGS_FOLDER,
TEST_REPORT_FILE,
)
Expand All @@ -39,16 +42,33 @@
airflow_logs_path = None


def _setup_s3_integration(dot_env_file, tmp_dir):
copyfile(LOCALSTACK_PATH, tmp_dir / "localstack.yml")

copyfile(AWS_INIT_PATH, tmp_dir / "init-aws.sh")
current_permissions = os.stat(tmp_dir / "init-aws.sh").st_mode
os.chmod(tmp_dir / "init-aws.sh", current_permissions | 0o111)

dot_env_file.write_text(
f"AIRFLOW_UID={os.getuid()}\n"
"AWS_DEFAULT_REGION=us-east-1\n"
"AWS_ENDPOINT_URL_S3=http://localstack:4566\n"
"AIRFLOW__LOGGING__REMOTE_LOGGING=true\n"
"AIRFLOW_CONN_AWS_S3_LOGS=aws://test:test@\n"
"AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID=aws_s3_logs\n"
"AIRFLOW__LOGGING__DELETE_LOCAL_LOGS=true\n"
"AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=s3://test-airflow-logs\n"
)
os.environ["ENV_FILE_PATH"] = str(dot_env_file)


def spin_up_airflow_environment(tmp_path_factory):
global compose_instance
global airflow_logs_path
tmp_dir = tmp_path_factory.mktemp("airflow-e2e-tests")

compose_file_path = (
AIRFLOW_ROOT_PATH / "airflow-core" / "docs" / "howto" / "docker-compose" / "docker-compose.yaml"
)

copyfile(compose_file_path, tmp_dir / "docker-compose.yaml")
console.print(f"[yellow]Using docker compose file: {DOCKER_COMPOSE_PATH}")
copyfile(DOCKER_COMPOSE_PATH, tmp_dir / "docker-compose.yaml")

subfolders = ("dags", "logs", "plugins", "config")

Expand All @@ -63,19 +83,23 @@ def spin_up_airflow_environment(tmp_path_factory):
copytree(E2E_DAGS_FOLDER, tmp_dir / "dags", dirs_exist_ok=True)

dot_env_file = tmp_dir / ".env"
dot_env_file.write_text(f"AIRFLOW_UID={os.getuid()}\n")

console.print(f"[yellow]Creating .env file :[/ {dot_env_file}")

dot_env_file.write_text(f"AIRFLOW_UID={os.getuid()}\n")
os.environ["AIRFLOW_IMAGE_NAME"] = DOCKER_IMAGE
compose_file_names = ["docker-compose.yaml"]

if E2E_TEST_MODE == "remote_log":
compose_file_names.append("localstack.yml")
_setup_s3_integration(dot_env_file, tmp_dir)

# If we are using the image from ghcr.io/apache/airflow/main we do not pull
# as it is already available and loaded using prepare_breeze_and_image step in workflow
pull = False if DOCKER_IMAGE.startswith("ghcr.io/apache/airflow/main/") else True

console.print(f"[blue]Spinning up airflow environment using {DOCKER_IMAGE}")

compose_instance = DockerCompose(tmp_dir, compose_file_name=["docker-compose.yaml"], pull=pull)
compose_instance = DockerCompose(tmp_dir, compose_file_name=compose_file_names, pull=pull)

compose_instance.start()

Expand Down
7 changes: 6 additions & 1 deletion airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
DOCKER_IMAGE = os.environ.get("DOCKER_IMAGE") or DEFAULT_DOCKER_IMAGE
os.environ["AIRFLOW_UID"] = str(os.getuid())

DOCKER_COMPOSE_PATH = AIRFLOW_ROOT_PATH / "airflow-core" / "docs" / "howto" / "docker-compose"
DOCKER_COMPOSE_PATH = (
AIRFLOW_ROOT_PATH / "airflow-core" / "docs" / "howto" / "docker-compose" / "docker-compose.yaml"
)

AIRFLOW_WWW_USER_USERNAME = os.environ.get("_AIRFLOW_WWW_USER_USERNAME", "airflow")
AIRFLOW_WWW_USER_PASSWORD = os.environ.get("_AIRFLOW_WWW_USER_PASSWORD", "airflow")
Expand All @@ -37,3 +39,6 @@
# The logs folder where the Airflow logs will be copied to and uploaded to github artifacts
LOGS_FOLDER = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "logs"
TEST_REPORT_FILE = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "_e2e_test_report.json"
LOCALSTACK_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "localstack.yml"
E2E_TEST_MODE = os.environ.get("E2E_TEST_MODE", "basic")
AWS_INIT_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "scripts" / "init-aws.sh"
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ def trigger_dag_and_wait(self, dag_id: str, json=None):
run_id=resp["dag_run_id"],
)

def get_task_logs(self, dag_id: str, run_id: str, task_id: str, try_number: int = 1):
"""Get task logs via API."""
return self._make_request(
method="GET",
endpoint=f"dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/logs/{try_number}",
)


class TaskSDKClient:
"""Client for interacting with the Task SDK API."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading