Skip to content

Commit

Permalink
Run tests for Providers also for Airflow 2.8
Browse files Browse the repository at this point in the history
This is a follow-up on apache#39513 to add support for running Provider
tests against Airlfow 2.8 installed from PyPI.
  • Loading branch information
potiuk committed May 22, 2024
1 parent 39269d6 commit c149fbd
Show file tree
Hide file tree
Showing 19 changed files with 218 additions and 166 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/check-providers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,10 @@ jobs:
Remove incompatible Airflow
${{ matrix.airflow-version }}:Python ${{ matrix.python-version }} provider packages
run: |
rm -vf ${{ matrix.remove-providers }}
working-directory: ./dist
for provider in ${{ matrix.remove-providers }}; do
echo "Removing incompatible provider: ${provider}"
rm -vf dist/apache_airflow_providers_${provider/./_}*
done
if: matrix.remove-providers != ''
- name: "Download airflow package: wheel"
run: |
Expand Down
6 changes: 6 additions & 0 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,12 @@ function determine_airflow_to_use() {
python "${IN_CONTAINER_DIR}/install_airflow_and_providers.py"
# Some packages might leave legacy typing module which causes test issues
pip uninstall -y typing || true
# Upgrade pytest and pytest extensions to latest version if they have been accidentally
# downgraded by constraints
pip install --upgrade pytest pytest aiofiles aioresponses pytest-asyncio pytest-custom-exit-code \
pytest-icdiff pytest-instafail pytest-mock pytest-rerunfailures pytest-timeouts \
pytest-xdist pytest requests_mock time-machine \
--constraint https://raw.githubusercontent.com/apache/airflow/constraints-main/constraints-${PYTHON_MAJOR_MINOR_VERSION}.txt
fi

if [[ "${USE_AIRFLOW_VERSION}" =~ ^2\.2\..*|^2\.1\..*|^2\.0\..* && "${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=}" != "" ]]; then
Expand Down
3 changes: 3 additions & 0 deletions airflow/providers/common/io/operators/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class FileTransferOperator(BaseOperator):
:param source_conn_id: The optional source connection id.
:param dest_conn_id: The optional destination connection id.
Note that open-lineage integration for FileTransferOperator requires Airflow 2.9+ even if file
transfer operator should work with Airflow 2.8+, due to its reliance on universal_pathlib >= 0.2.2
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:FileTransferOperator`
Expand Down
78 changes: 39 additions & 39 deletions dev/breeze/doc/images/output_testing_db-tests.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion dev/breeze/doc/images/output_testing_db-tests.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
31bee62efc24fa61aa868a0643e0db6b
17d0216889e996fe5fd813e0f1c76af6
72 changes: 36 additions & 36 deletions dev/breeze/doc/images/output_testing_non-db-tests.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion dev/breeze/doc/images/output_testing_non-db-tests.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
543f9814b475e511749fdebf29d16298
7f335b6d8225b8fb373b698c38bb86cf
92 changes: 46 additions & 46 deletions dev/breeze/doc/images/output_testing_tests.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion dev/breeze/doc/images/output_testing_tests.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3c3217a7eceaa77718af4876622e1b0f
68d630703517818c928daf2afe847a79
4 changes: 2 additions & 2 deletions dev/breeze/src/airflow_breeze/commands/testing_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def _verify_parallelism_parameters(
)
option_skip_providers = click.option(
"--skip-providers",
help="Coma separated list of providers to skip when running tests",
help="Space-separated list of provider ids to skip when running tests",
type=str,
default="",
envvar="SKIP_PROVIDERS",
Expand Down Expand Up @@ -749,7 +749,7 @@ def _run_test_command(
if skip_providers:
ignored_path_list = [
f"--ignore=tests/providers/{provider_id.replace('.','/')}"
for provider_id in skip_providers.split(",")
for provider_id in skip_providers.split(" ")
]
extra_pytest_args = (*extra_pytest_args, *ignored_path_list)
if run_in_parallel:
Expand Down
19 changes: 6 additions & 13 deletions dev/breeze/src/airflow_breeze/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from enum import Enum
from functools import lru_cache
from pathlib import Path
from typing import Iterable

from airflow_breeze.utils.host_info_utils import Architecture
from airflow_breeze.utils.path_utils import AIRFLOW_SOURCES_ROOT
Expand Down Expand Up @@ -465,29 +464,23 @@ def get_airflow_extras():
CHICKEN_EGG_PROVIDERS = " ".join([])


def _exclusion(providers: Iterable[str]) -> str:
return " ".join(
[f"apache_airflow_providers_{provider.replace('.', '_').replace('-','_')}*" for provider in providers]
)


BASE_PROVIDERS_COMPATIBILITY_CHECKS: list[dict[str, str]] = [
BASE_PROVIDERS_COMPATIBILITY_CHECKS: list[dict[str, str | list[str]]] = [
{
"python-version": "3.8",
"airflow-version": "2.7.1",
"remove-providers": _exclusion(["common.io", "fab"]),
"remove-providers": "common.io fab",
"run-tests": "false",
},
{
"python-version": "3.8",
"airflow-version": "2.8.0",
"remove-providers": _exclusion(["fab"]),
"run-tests": "false",
"airflow-version": "2.8.4",
"remove-providers": "fab",
"run-tests": "true",
},
{
"python-version": "3.8",
"airflow-version": "2.9.1",
"remove-providers": _exclusion([]),
"remove-providers": "",
"run-tests": "true",
},
]
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ combine-as-imports = true
"airflow/api/auth/backend/kerberos_auth.py" = ["E402"]
"airflow/security/kerberos.py" = ["E402"]
"airflow/security/utils.py" = ["E402"]
"tests/providers/common/io/xcom/test_backend.py" = ["E402"]
"tests/providers/elasticsearch/log/elasticmock/__init__.py" = ["E402"]
"tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py" = ["E402"]
"tests/providers/google/cloud/hooks/vertex_ai/test_batch_prediction_job.py" = ["E402"]
Expand Down
6 changes: 6 additions & 0 deletions scripts/docker/entrypoint_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ function determine_airflow_to_use() {
python "${IN_CONTAINER_DIR}/install_airflow_and_providers.py"
# Some packages might leave legacy typing module which causes test issues
pip uninstall -y typing || true
# Upgrade pytest and pytest extensions to latest version if they have been accidentally
# downgraded by constraints
pip install --upgrade pytest pytest aiofiles aioresponses pytest-asyncio pytest-custom-exit-code \
pytest-icdiff pytest-instafail pytest-mock pytest-rerunfailures pytest-timeouts \
pytest-xdist pytest requests_mock time-machine \
--constraint https://raw.githubusercontent.com/apache/airflow/constraints-main/constraints-${PYTHON_MAJOR_MINOR_VERSION}.txt
fi

if [[ "${USE_AIRFLOW_VERSION}" =~ ^2\.2\..*|^2\.1\..*|^2\.0\..* && "${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=}" != "" ]]; then
Expand Down
17 changes: 14 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,10 +1005,14 @@ def create_dag(
with_dagrun_type=DagRunType.SCHEDULED,
**kwargs,
):
op_kwargs = {}
from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS

if AIRFLOW_V_2_9_PLUS:
op_kwargs["task_display_name"] = task_display_name
with dag_maker(dag_id, **kwargs) as dag:
op = EmptyOperator(
task_id=task_id,
task_display_name=task_display_name,
max_active_tis_per_dag=max_active_tis_per_dag,
max_active_tis_per_dagrun=max_active_tis_per_dagrun,
executor_config=executor_config or {},
Expand All @@ -1019,6 +1023,7 @@ def create_dag(
email=email,
pool=pool,
trigger_rule=trigger_rule,
**op_kwargs,
)
if with_dagrun_type is not None:
dag_maker.create_dagrun(run_type=with_dagrun_type)
Expand Down Expand Up @@ -1170,11 +1175,17 @@ def reset_logging_config():
def suppress_info_logs_for_dag_and_fab():
import logging

from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS

dag_logger = logging.getLogger("airflow.models.dag")
dag_logger.setLevel(logging.WARNING)

fab_logger = logging.getLogger("airflow.providers.fab.auth_manager.security_manager.override")
fab_logger.setLevel(logging.WARNING)
if AIRFLOW_V_2_9_PLUS:
fab_logger = logging.getLogger("airflow.providers.fab.auth_manager.security_manager.override")
fab_logger.setLevel(logging.WARNING)
else:
fab_logger = logging.getLogger("airflow.www.fab_security")
fab_logger.setLevel(logging.WARNING)


@pytest.fixture(scope="module", autouse=True)
Expand Down
1 change: 0 additions & 1 deletion tests/providers/common/io/operators/test_file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def test_get_openlineage_facets_on_start():

expected_input = Dataset(namespace=f"s3://{src_bucket}", name=src_key)
expected_output = Dataset(namespace=f"s3://{dst_bucket}", name=dst_key)

op = FileTransferOperator(
task_id="test",
src=f"s3://{src_bucket}/{src_key}",
Expand Down
17 changes: 14 additions & 3 deletions tests/providers/common/io/xcom/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,28 @@

import pytest

from airflow.exceptions import AirflowOptionalProviderFeatureException
from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS

pytestmark = [
pytest.mark.db_test,
pytest.mark.skipif(not AIRFLOW_V_2_9_PLUS, reason="Tests for Airflow 2.9.0+ only"),
]


import airflow.models.xcom
from airflow.models.xcom import BaseXCom, resolve_xcom_backend
from airflow.operators.empty import EmptyOperator
from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend

try:
from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend
except AirflowOptionalProviderFeatureException:
pass
from airflow.utils import timezone
from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.test_utils import db
from tests.test_utils.config import conf_vars

pytestmark = pytest.mark.db_test


@pytest.fixture(autouse=True)
def reset_db():
Expand Down
37 changes: 22 additions & 15 deletions tests/providers/google/cloud/log/test_stackdriver_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
from google.cloud.logging import Resource
from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse, LogEntry

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.providers.google.cloud.log.stackdriver_task_handler import StackdriverTaskHandler
from airflow.utils import timezone
from airflow.utils.state import TaskInstanceState
from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_dags, clear_db_runs

Expand Down Expand Up @@ -81,21 +83,26 @@ def test_should_use_configured_log_name(mock_client, mock_get_creds_and_project_
mock_get_creds_and_project_id.return_value = ("creds", "project_id")

try:
with conf_vars(
{
("logging", "remote_logging"): "True",
("logging", "remote_base_log_folder"): "stackdriver://host/path",
}
):
importlib.reload(airflow_local_settings)
settings.configure_logging()

logger = logging.getLogger("airflow.task")
handler = logger.handlers[0]
assert isinstance(handler, StackdriverTaskHandler)
with mock.patch.object(handler, "transport_type") as transport_type_mock:
logger.error("foo")
transport_type_mock.assert_called_once_with(mock_client.return_value, "path")
# this is needed for Airflow 2.8 and below where default settings are triggering warning on
# extra "name" in the configuration of stackdriver handler. As of Airflow 2.9 this warning is not
# emitted.
expected_warnings = () if AIRFLOW_V_2_9_PLUS else (RemovedInAirflow3Warning,)
with pytest.warns(expected_warning=expected_warnings):
with conf_vars(
{
("logging", "remote_logging"): "True",
("logging", "remote_base_log_folder"): "stackdriver://host/path",
}
):
importlib.reload(airflow_local_settings)
settings.configure_logging()

logger = logging.getLogger("airflow.task")
handler = logger.handlers[0]
assert isinstance(handler, StackdriverTaskHandler)
with mock.patch.object(handler, "transport_type") as transport_type_mock:
logger.error("foo")
transport_type_mock.assert_called_once_with(mock_client.return_value, "path")
finally:
importlib.reload(airflow_local_settings)
settings.configure_logging()
Expand Down
8 changes: 6 additions & 2 deletions tests/providers/smtp/notifications/test_smtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def test_notifier_with_defaults(self, mock_smtphook_hook, create_task_instance):
from_email=conf.get("smtp", "smtp_mail_from"),
to="test_reciver@test.com",
subject="DAG dag - Task op - Run ID test in State None",
html_content=f"""<!DOCTYPE html>\n<html>\n <head>\n <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n <meta name="viewport" content="width=device-width">\n </head>\n<body>\n <table role="presentation">\n \n <tr>\n <td>Run ID:</td>\n <td>test</td>\n </tr>\n <tr>\n <td>Try:</td>\n <td>{NUM_TRY} of 1</td>\n </tr>\n <tr>\n <td>Task State:</td>\n <td>None</td>\n </tr>\n <tr>\n <td>Host:</td>\n <td></td>\n </tr>\n <tr>\n <td>Log Link:</td>\n <td><a href="http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs" style="text-decoration:underline;">http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs</a></td>\n </tr>\n <tr>\n <td>Mark Success Link:</td>\n <td><a href="http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success" style="text-decoration:underline;">http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success</a></td>\n </tr>\n \n </table>\n</body>\n</html>""",
html_content=mock.ANY,
smtp_conn_id="smtp_default",
files=None,
cc=None,
Expand All @@ -142,6 +142,8 @@ def test_notifier_with_defaults(self, mock_smtphook_hook, create_task_instance):
mime_charset="utf-8",
custom_headers=None,
)
content = mock_smtphook_hook.return_value.__enter__().send_email_smtp.call_args.kwargs["html_content"]
assert f"{NUM_TRY} of 1" in content

@mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
def test_notifier_with_defaults_sla(self, mock_smtphook_hook, dag_maker):
Expand All @@ -163,7 +165,7 @@ def test_notifier_with_defaults_sla(self, mock_smtphook_hook, dag_maker):
from_email=conf.get("smtp", "smtp_mail_from"),
to="test_reciver@test.com",
subject="SLA Missed for DAG test_notifier - Task op",
html_content="""<!DOCTYPE html>\n<html>\n <head>\n <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n <meta name="viewport" content="width=device-width">\n </head>\n<body>\n <table role="presentation">\n \n <tr>\n <td>Dag:</td>\n <td>test_notifier</td>\n </tr>\n <tr>\n <td>Task List:</td>\n <td>[]</td>\n </tr>\n <tr>\n <td>Blocking Task List:</td>\n <td>[]</td>\n </tr>\n <tr>\n <td>SLAs:</td>\n <td>[(\'test_notifier\', \'op\', \'2018-01-01T00:00:00+00:00\')]</td>\n </tr>\n <tr>\n <td>Blocking TI\'s</td>\n <td>[]</td>\n </tr>\n \n </table>\n</body>\n</html>""",
html_content=mock.ANY,
smtp_conn_id="smtp_default",
files=None,
cc=None,
Expand All @@ -172,6 +174,8 @@ def test_notifier_with_defaults_sla(self, mock_smtphook_hook, dag_maker):
mime_charset="utf-8",
custom_headers=None,
)
content = mock_smtphook_hook.return_value.__enter__().send_email_smtp.call_args.kwargs["html_content"]
assert "Task List:" in content

@mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
def test_notifier_with_nondefault_conf_vars(self, mock_smtphook_hook, create_task_instance):
Expand Down
11 changes: 10 additions & 1 deletion tests/test_utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,16 @@
TaskOutletDatasetReference,
)
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role

try:
from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role
except ImportError:
# Handle Pre-airflow 2.9 case where FAB was part of the core airflow
from airflow.auth.managers.fab.models import ( # type: ignore[no-redef]
Permission,
Resource,
assoc_permission_role,
)
from airflow.security.permissions import RESOURCE_DAG_PREFIX
from airflow.utils.db import add_default_pool_if_not_exists, create_default_connections, reflect_tables
from airflow.utils.session import create_session
Expand Down

0 comments on commit c149fbd

Please sign in to comment.