diff --git a/airflow-core/tests/unit/listeners/class_listener.py b/airflow-core/tests/unit/listeners/class_listener.py index de235abbd401e..49a018ea9d838 100644 --- a/airflow-core/tests/unit/listeners/class_listener.py +++ b/airflow-core/tests/unit/listeners/class_listener.py @@ -20,7 +20,7 @@ from airflow.listeners import hookimpl from airflow.utils.state import DagRunState, TaskInstanceState -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: @@ -64,8 +64,7 @@ def on_dag_run_success(self, dag_run, msg: str): @hookimpl def on_dag_run_failed(self, dag_run, msg: str): self.state.append(DagRunState.FAILED) - -elif AIRFLOW_V_2_10_PLUS: +else: class ClassBasedListener: # type: ignore[no-redef] def __init__(self): @@ -95,36 +94,6 @@ def on_task_instance_success(self, previous_state, task_instance): @hookimpl def on_task_instance_failed(self, previous_state, task_instance, error: None | str | BaseException): self.state.append(TaskInstanceState.FAILED) -else: - - class ClassBasedListener: # type: ignore[no-redef] - def __init__(self): - self.started_component = None - self.stopped_component = None - self.state = [] - - @hookimpl - def on_starting(self, component): - self.started_component = component - self.state.append(DagRunState.RUNNING) - - @hookimpl - def before_stopping(self, component): - global stopped_component - stopped_component = component - self.state.append(DagRunState.SUCCESS) - - @hookimpl - def on_task_instance_running(self, previous_state, task_instance, session): - self.state.append(TaskInstanceState.RUNNING) - - @hookimpl - def on_task_instance_success(self, previous_state, task_instance, session): - self.state.append(TaskInstanceState.SUCCESS) - - @hookimpl - def on_task_instance_failed(self, previous_state, task_instance, session): - self.state.append(TaskInstanceState.FAILED) def clear(): diff --git a/devel-common/src/tests_common/test_utils/compat.py b/devel-common/src/tests_common/test_utils/compat.py index 59e5fbdaf1a86..c5eef051bda0d 100644 --- a/devel-common/src/tests_common/test_utils/compat.py +++ b/devel-common/src/tests_common/test_utils/compat.py @@ -23,8 +23,6 @@ from airflow.exceptions import AirflowOptionalProviderFeatureException from airflow.utils.helpers import prune_dict -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS - try: # ImportError has been renamed to ParseImportError in airflow 2.10.0, and since our provider tests should # run on all supported versions of Airflow, this compatibility shim falls back to the old ImportError so @@ -86,35 +84,27 @@ except ModuleNotFoundError: # dataset is renamed to asset since Airflow 3.0 from airflow.models.dataset import ( + DagScheduleDatasetAliasReference as DagScheduleAssetAliasReference, DagScheduleDatasetReference as DagScheduleAssetReference, + DatasetAliasModel as AssetAliasModel, DatasetDagRunQueue as AssetDagRunQueue, DatasetEvent as AssetEvent, DatasetModel as AssetModel, TaskOutletDatasetReference as TaskOutletAssetReference, ) - if AIRFLOW_V_2_10_PLUS: - from airflow.models.dataset import ( - DagScheduleDatasetAliasReference as DagScheduleAssetAliasReference, - DatasetAliasModel as AssetAliasModel, - ) - def deserialize_operator(serialized_operator: dict[str, Any]) -> Operator: - if AIRFLOW_V_2_10_PLUS: - # In airflow 2.10+ we can deserialize operator using regular deserialize method. - # We do not need to use deserialize_operator method explicitly but some tests are deserializing the - # operator and in the future they could use regular ``deserialize`` method. This method is a shim - # to make deserialization of operator works for tests run against older Airflow versions and tests - # should use that method instead of calling ``BaseSerialization.deserialize`` directly. - # We can remove this method and switch to the regular ``deserialize`` method as long as all providers - # are updated to airflow 2.10+. - from airflow.serialization.serialized_objects import BaseSerialization - - return BaseSerialization.deserialize(serialized_operator) - from airflow.serialization.serialized_objects import SerializedBaseOperator - - return SerializedBaseOperator.deserialize_operator(serialized_operator) + # In airflow 2.10+ we can deserialize operator using regular deserialize method. + # We do not need to use deserialize_operator method explicitly but some tests are deserializing the + # operator and in the future they could use regular ``deserialize`` method. This method is a shim + # to make deserialization of operator works for tests run against older Airflow versions and tests + # should use that method instead of calling ``BaseSerialization.deserialize`` directly. + # We can remove this method and switch to the regular ``deserialize`` method as long as all providers + # are updated to airflow 2.10+. + from airflow.serialization.serialized_objects import BaseSerialization + + return BaseSerialization.deserialize(serialized_operator) def connection_to_dict( diff --git a/devel-common/src/tests_common/test_utils/db.py b/devel-common/src/tests_common/test_utils/db.py index 4ab88d1c75e4c..c5711ec514c05 100644 --- a/devel-common/src/tests_common/test_utils/db.py +++ b/devel-common/src/tests_common/test_utils/db.py @@ -51,7 +51,7 @@ ParseImportError, TaskOutletAssetReference, ) -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from pathlib import Path @@ -159,11 +159,10 @@ def clear_db_assets(): session.query(AssetDagRunQueue).delete() session.query(DagScheduleAssetReference).delete() session.query(TaskOutletAssetReference).delete() - if AIRFLOW_V_2_10_PLUS: - from tests_common.test_utils.compat import AssetAliasModel, DagScheduleAssetAliasReference + from tests_common.test_utils.compat import AssetAliasModel, DagScheduleAssetAliasReference - session.query(AssetAliasModel).delete() - session.query(DagScheduleAssetAliasReference).delete() + session.query(AssetAliasModel).delete() + session.query(DagScheduleAssetAliasReference).delete() if AIRFLOW_V_3_0_PLUS: from airflow.models.asset import ( AssetActive, diff --git a/devel-common/src/tests_common/test_utils/version_compat.py b/devel-common/src/tests_common/test_utils/version_compat.py index 7227de2d85962..2e990761628fb 100644 --- a/devel-common/src/tests_common/test_utils/version_compat.py +++ b/devel-common/src/tests_common/test_utils/version_compat.py @@ -32,6 +32,5 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) [].sort() diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py index c74b23352f93f..bdd90f1d6baa5 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py @@ -26,7 +26,6 @@ from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook -from airflow.providers.amazon.version_compat import AIRFLOW_V_2_10_PLUS from airflow.providers.common.sql.hooks.sql import DbApiHook if TYPE_CHECKING: @@ -260,6 +259,4 @@ def get_openlineage_database_dialect(self, connection: Connection) -> str: def get_openlineage_default_schema(self) -> str | None: """Return current schema. This is usually changed with ``SEARCH_PATH`` parameter.""" - if AIRFLOW_V_2_10_PLUS: - return self.get_first("SELECT CURRENT_SCHEMA();")[0] - return super().get_openlineage_default_schema() + return self.get_first("SELECT CURRENT_SCHEMA();")[0] diff --git a/providers/amazon/src/airflow/providers/amazon/version_compat.py b/providers/amazon/src/airflow/providers/amazon/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/amazon/src/airflow/providers/amazon/version_compat.py +++ b/providers/amazon/src/airflow/providers/amazon/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py b/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py index ab5f91ee2fb2b..ea0022282209b 100644 --- a/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py +++ b/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py @@ -18,8 +18,6 @@ from datetime import datetime -import pytest - from airflow.decorators import task from airflow.models.baseoperator import chain from airflow.models.dag import DAG @@ -28,7 +26,6 @@ ) from system.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS """ Prerequisites: The account which runs this test must manually have the following: @@ -42,8 +39,6 @@ Then, the SageMakerNotebookOperator will run a test notebook. This should spin up a SageMaker training job, run the notebook, and exit successfully. """ -pytestmark = pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Test requires Airflow 2.10+") - DAG_ID = "example_sagemaker_unified_studio" # Externally fetched variables: diff --git a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py index 074d2babd8524..70cedadab341d 100644 --- a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py +++ b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py @@ -59,7 +59,7 @@ from tests_common import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test @@ -380,7 +380,6 @@ def test_stopped_tasks(self): class TestAwsEcsExecutor: """Tests the AWS ECS Executor.""" - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Test requires Airflow 2.10+") @mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor.change_state") def test_execute(self, change_state_mock, mock_airflow_key, mock_executor, mock_cmd): """Test execution from end-to-end.""" diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py index bb5859516137f..ad31a7d9c074b 100644 --- a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py @@ -43,8 +43,6 @@ ) from airflow.utils.timezone import datetime -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS - @pytest.fixture def mocked_s3_res(): @@ -59,19 +57,17 @@ def s3_bucket(mocked_s3_res): return bucket -if AIRFLOW_V_2_10_PLUS: - - @pytest.fixture - def hook_lineage_collector(): - from airflow.lineage import hook - from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector +@pytest.fixture +def hook_lineage_collector(): + from airflow.lineage import hook + from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector - hook._hook_lineage_collector = None - hook._hook_lineage_collector = hook.HookLineageCollector() + hook._hook_lineage_collector = None + hook._hook_lineage_collector = hook.HookLineageCollector() - yield get_hook_lineage_collector() + yield get_hook_lineage_collector() - hook._hook_lineage_collector = None + hook._hook_lineage_collector = None class TestAwsS3Hook: @@ -448,7 +444,6 @@ def test_load_string(self, s3_bucket): resource = boto3.resource("s3").Object(s3_bucket, "my_key") assert resource.get()["Body"].read() == b"Cont\xc3\xa9nt" - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") def test_load_string_exposes_lineage(self, s3_bucket, hook_lineage_collector): hook = S3Hook() @@ -1023,7 +1018,6 @@ def test_load_file_gzip(self, s3_bucket, tmp_path): resource = boto3.resource("s3").Object(s3_bucket, "my_key") assert gz.decompress(resource.get()["Body"].read()) == b"Content" - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") def test_load_file_exposes_lineage(self, s3_bucket, tmp_path, hook_lineage_collector): hook = S3Hook() path = tmp_path / "testfile" @@ -1091,7 +1085,6 @@ def test_copy_object_no_acl( ACL="private", ) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @mock_aws def test_copy_object_ol_instrumentation(self, s3_bucket, hook_lineage_collector): mock_hook = S3Hook() @@ -1230,7 +1223,6 @@ def test_download_file(self, mock_temp_file, tmp_path): assert path.name == output_file - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @mock.patch("airflow.providers.amazon.aws.hooks.s3.NamedTemporaryFile") def test_download_file_exposes_lineage(self, mock_temp_file, tmp_path, hook_lineage_collector): path = tmp_path / "airflow_tmp_test_s3_hook" @@ -1273,7 +1265,6 @@ def test_download_file_with_preserve_name(self, mock_open, tmp_path): mock_open.assert_called_once_with(path, "wb") - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @mock.patch("airflow.providers.amazon.aws.hooks.s3.open") def test_download_file_with_preserve_name_exposes_lineage( self, mock_open, tmp_path, hook_lineage_collector diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index 3696f552d766b..e2897457c0576 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -44,7 +44,7 @@ from airflow.utils.timezone import datetime from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS def get_time_str(time_in_milliseconds): @@ -270,19 +270,13 @@ def test_read(self, monkeypatch): {"timestamp": current_time, "message": "Third"}, ], ) - if AIRFLOW_V_2_10_PLUS: - monkeypatch.setattr(self.cloudwatch_task_handler, "_read_from_logs_server", lambda a, b: ([], [])) - msg_template = textwrap.dedent(""" - INFO - ::group::Log message source details - *** Reading remote log from Cloudwatch log_group: {} log_stream: {} - INFO - ::endgroup:: - {} - """)[1:][:-1] # Strip off leading and trailing new lines, but not spaces - else: - msg_template = textwrap.dedent(""" - *** Reading remote log from Cloudwatch log_group: {} log_stream: {} - {} - """).strip() + monkeypatch.setattr(self.cloudwatch_task_handler, "_read_from_logs_server", lambda a, b: ([], [])) + msg_template = textwrap.dedent(""" + INFO - ::group::Log message source details + *** Reading remote log from Cloudwatch log_group: {} log_stream: {} + INFO - ::endgroup:: + {} + """)[1:][:-1] # Strip off leading and trailing new lines, but not spaces logs, metadata = self.cloudwatch_task_handler.read(self.ti) if AIRFLOW_V_3_0_PLUS: diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py index 1b124ee20280f..bbd4ade48d1ec 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from unittest.mock import MagicMock, PropertyMock, call, patch +from unittest.mock import MagicMock, call, patch import pytest @@ -40,14 +40,13 @@ class TestRedshiftSQLOpenLineage: @patch.dict("os.environ", AIRFLOW_CONN_AWS_DEFAULT=f"aws://?region_name={MOCK_REGION_NAME}") @pytest.mark.parametrize( - "connection_host, connection_extra, expected_identity, is_over_210, expected_schemaname", + "connection_host, connection_extra, expected_identity, expected_schemaname", [ # test without a connection host but with a cluster_identifier in connection extra ( None, {"iam": True, "cluster_identifier": "cluster_identifier_from_extra"}, f"cluster_identifier_from_extra.{MOCK_REGION_NAME}", - True, "database.public", ), # test with a connection host and without a cluster_identifier in connection extra @@ -55,7 +54,6 @@ class TestRedshiftSQLOpenLineage: "cluster_identifier_from_host.id.my_region.redshift.amazonaws.com", {"iam": True}, "cluster_identifier_from_host.my_region", - True, "database.public", ), # test with both connection host and cluster_identifier in connection extra @@ -63,41 +61,20 @@ class TestRedshiftSQLOpenLineage: "cluster_identifier_from_host.x.y", {"iam": True, "cluster_identifier": "cluster_identifier_from_extra"}, f"cluster_identifier_from_extra.{MOCK_REGION_NAME}", - True, "database.public", ), - # test when hostname doesn't match pattern - ("1.2.3.4", {}, "1.2.3.4", True, "database.public"), - # test with Airflow below 2.10 not using Hook connection - ( - "cluster_identifier_from_host.id.my_region.redshift.amazonaws.com", - {"iam": True}, - "cluster_identifier_from_host.my_region", - False, - "public", - ), ], ) - @patch( - "airflow.providers.amazon.aws.hooks.redshift_sql.AIRFLOW_V_2_10_PLUS", - new_callable=PropertyMock, - ) - @patch("airflow.providers.openlineage.utils.utils.AIRFLOW_V_2_10_PLUS", new_callable=PropertyMock) @patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.conn") def test_execute_openlineage_events( self, mock_aws_hook_conn, - mock_ol_utils, - mock_redshift_sql, connection_host, connection_extra, expected_identity, - is_over_210, expected_schemaname, # self, mock_aws_hook_conn, connection_host, connection_extra, expected_identity, is_below_2_10, expected_schemaname ): - mock_ol_utils.__bool__ = lambda x: is_over_210 - mock_redshift_sql.__bool__ = lambda x: is_over_210 DB_NAME = "database" DB_SCHEMA_NAME = "public" @@ -176,97 +153,93 @@ def get_db_hook(self): dbapi_hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = rows lineage = op.get_openlineage_facets_on_start() - if is_over_210: - assert dbapi_hook.get_conn.return_value.cursor.return_value.execute.mock_calls == [ - call( - "SELECT SVV_REDSHIFT_COLUMNS.schema_name, " - "SVV_REDSHIFT_COLUMNS.table_name, " - "SVV_REDSHIFT_COLUMNS.column_name, " - "SVV_REDSHIFT_COLUMNS.ordinal_position, " - "SVV_REDSHIFT_COLUMNS.data_type, " - "SVV_REDSHIFT_COLUMNS.database_name \n" - "FROM SVV_REDSHIFT_COLUMNS \n" - f"WHERE SVV_REDSHIFT_COLUMNS.schema_name = '{expected_schemaname}' " - "AND SVV_REDSHIFT_COLUMNS.table_name IN ('little_table') " - "OR SVV_REDSHIFT_COLUMNS.database_name = 'another_db' " - "AND SVV_REDSHIFT_COLUMNS.schema_name = 'another_schema' AND " - "SVV_REDSHIFT_COLUMNS.table_name IN ('popular_orders_day_of_week')" - ), - call( - "SELECT SVV_REDSHIFT_COLUMNS.schema_name, " - "SVV_REDSHIFT_COLUMNS.table_name, " - "SVV_REDSHIFT_COLUMNS.column_name, " - "SVV_REDSHIFT_COLUMNS.ordinal_position, " - "SVV_REDSHIFT_COLUMNS.data_type, " - "SVV_REDSHIFT_COLUMNS.database_name \n" - "FROM SVV_REDSHIFT_COLUMNS \n" - f"WHERE SVV_REDSHIFT_COLUMNS.schema_name = '{expected_schemaname}' " - "AND SVV_REDSHIFT_COLUMNS.table_name IN ('Test_table')" - ), - ] - else: - assert dbapi_hook.get_conn.return_value.cursor.return_value.execute.mock_calls == [] + assert dbapi_hook.get_conn.return_value.cursor.return_value.execute.mock_calls == [ + call( + "SELECT SVV_REDSHIFT_COLUMNS.schema_name, " + "SVV_REDSHIFT_COLUMNS.table_name, " + "SVV_REDSHIFT_COLUMNS.column_name, " + "SVV_REDSHIFT_COLUMNS.ordinal_position, " + "SVV_REDSHIFT_COLUMNS.data_type, " + "SVV_REDSHIFT_COLUMNS.database_name \n" + "FROM SVV_REDSHIFT_COLUMNS \n" + f"WHERE SVV_REDSHIFT_COLUMNS.schema_name = '{expected_schemaname}' " + "AND SVV_REDSHIFT_COLUMNS.table_name IN ('little_table') " + "OR SVV_REDSHIFT_COLUMNS.database_name = 'another_db' " + "AND SVV_REDSHIFT_COLUMNS.schema_name = 'another_schema' AND " + "SVV_REDSHIFT_COLUMNS.table_name IN ('popular_orders_day_of_week')" + ), + call( + "SELECT SVV_REDSHIFT_COLUMNS.schema_name, " + "SVV_REDSHIFT_COLUMNS.table_name, " + "SVV_REDSHIFT_COLUMNS.column_name, " + "SVV_REDSHIFT_COLUMNS.ordinal_position, " + "SVV_REDSHIFT_COLUMNS.data_type, " + "SVV_REDSHIFT_COLUMNS.database_name \n" + "FROM SVV_REDSHIFT_COLUMNS \n" + f"WHERE SVV_REDSHIFT_COLUMNS.schema_name = '{expected_schemaname}' " + "AND SVV_REDSHIFT_COLUMNS.table_name IN ('Test_table')" + ), + ] expected_namespace = f"redshift://{expected_identity}:5439" - if is_over_210: - assert lineage.inputs == [ - Dataset( - namespace=expected_namespace, - name=f"{ANOTHER_DB_NAME}.{ANOTHER_DB_SCHEMA}.popular_orders_day_of_week", - facets={ - "schema": SchemaDatasetFacet( - fields=[ - SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"), - SchemaDatasetFacetFields(name="order_placed_on", type="timestamp"), - SchemaDatasetFacetFields(name="orders_placed", type="int4"), - ] - ) - }, - ), - Dataset( - namespace=expected_namespace, - name=f"{DB_NAME}.{DB_SCHEMA_NAME}.little_table", - facets={ - "schema": SchemaDatasetFacet( - fields=[ - SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"), - SchemaDatasetFacetFields(name="additional_constant", type="varchar"), - ] - ) - }, - ), - ] - assert lineage.outputs == [ - Dataset( - namespace=expected_namespace, - name=f"{DB_NAME}.{DB_SCHEMA_NAME}.test_table", - facets={ - "schema": SchemaDatasetFacet( - fields=[ - SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"), - SchemaDatasetFacetFields(name="order_placed_on", type="timestamp"), - SchemaDatasetFacetFields(name="orders_placed", type="int4"), - SchemaDatasetFacetFields(name="additional_constant", type="varchar"), - ] - ), - "columnLineage": ColumnLineageDatasetFacet( - fields={ - "additional_constant": Fields( - inputFields=[ - InputField( - namespace=expected_namespace, - name="database.public.little_table", - field="additional_constant", - ) - ], - transformationDescription="", - transformationType="", - ) - } - ), - }, - ) - ] + assert lineage.inputs == [ + Dataset( + namespace=expected_namespace, + name=f"{ANOTHER_DB_NAME}.{ANOTHER_DB_SCHEMA}.popular_orders_day_of_week", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"), + SchemaDatasetFacetFields(name="order_placed_on", type="timestamp"), + SchemaDatasetFacetFields(name="orders_placed", type="int4"), + ] + ) + }, + ), + Dataset( + namespace=expected_namespace, + name=f"{DB_NAME}.{DB_SCHEMA_NAME}.little_table", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"), + SchemaDatasetFacetFields(name="additional_constant", type="varchar"), + ] + ) + }, + ), + ] + assert lineage.outputs == [ + Dataset( + namespace=expected_namespace, + name=f"{DB_NAME}.{DB_SCHEMA_NAME}.test_table", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"), + SchemaDatasetFacetFields(name="order_placed_on", type="timestamp"), + SchemaDatasetFacetFields(name="orders_placed", type="int4"), + SchemaDatasetFacetFields(name="additional_constant", type="varchar"), + ] + ), + "columnLineage": ColumnLineageDatasetFacet( + fields={ + "additional_constant": Fields( + inputFields=[ + InputField( + namespace=expected_namespace, + name="database.public.little_table", + field="additional_constant", + ) + ], + transformationDescription="", + transformationType="", + ) + } + ), + }, + ) + ] assert lineage.job_facets == {"sql": SQLJobFacet(query=sql)} diff --git a/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py b/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py +++ b/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/celery/tests/unit/celery/cli/test_celery_command.py b/providers/celery/tests/unit/celery/cli/test_celery_command.py index b10a7afa12681..cae229fd3b5f2 100644 --- a/providers/celery/tests/unit/celery/cli/test_celery_command.py +++ b/providers/celery/tests/unit/celery/cli/test_celery_command.py @@ -31,7 +31,7 @@ from airflow.providers.celery.cli.celery_command import _run_stale_bundle_cleanup from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test @@ -313,13 +313,6 @@ def _test_run_command_daemon(self, mock_celery_app, mock_daemon, mock_setup_loca stderr="/tmp/flower-stderr.log", log="/tmp/flower.log", ) - if AIRFLOW_V_2_10_PLUS - else mock.call( - process="flower", - stdout="/tmp/flower-stdout.log", - stderr="/tmp/flower-stderr.log", - log="/tmp/flower.log", - ) ] mock_pid_file.assert_has_calls([mock.call(mock_setup_locations.return_value[0], -1)]) assert mock_open.mock_calls == [ diff --git a/providers/celery/tests/unit/celery/executors/test_celery_executor.py b/providers/celery/tests/unit/celery/executors/test_celery_executor.py index 22dbd59a914c1..974dc2b06f72e 100644 --- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py +++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py @@ -44,7 +44,7 @@ from tests_common.test_utils import db from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test @@ -226,8 +226,8 @@ def test_try_adopt_task_instances(self): not_adopted_tis = executor.try_adopt_task_instances(tis) - key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, 0 if AIRFLOW_V_2_10_PLUS else 1) - key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, 0 if AIRFLOW_V_2_10_PLUS else 1) + key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, 0) + key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, 0) assert executor.running == {key_1, key_2} assert executor.tasks == {key_1: AsyncResult("231"), key_2: AsyncResult("232")} diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py b/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py index 01fb1ab201828..b2b3c3e86e248 100644 --- a/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py +++ b/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py @@ -20,7 +20,6 @@ from typing import TYPE_CHECKING from airflow.providers.common.compat.version_compat import ( - AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS, ) @@ -38,16 +37,12 @@ from airflow.auth.managers.models.resource_details import DatasetDetails as AssetDetails from airflow.datasets import ( Dataset as Asset, + DatasetAlias as AssetAlias, DatasetAll as AssetAll, DatasetAny as AssetAny, + expand_alias_to_datasets as expand_alias_to_assets, ) - if AIRFLOW_V_2_10_PLUS: - from airflow.datasets import ( - DatasetAlias as AssetAlias, - expand_alias_to_datasets as expand_alias_to_assets, - ) - __all__ = [ "Asset", diff --git a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py index 1b18723c45736..bf7f1705c6db8 100644 --- a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py +++ b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from airflow.providers.common.compat.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS def _get_asset_compat_hook_lineage_collector(): @@ -85,9 +85,7 @@ def get_hook_lineage_collector(): return get_hook_lineage_collector() - # HookLineageCollector added in 2.10 - if AIRFLOW_V_2_10_PLUS: - return _get_asset_compat_hook_lineage_collector() + return _get_asset_compat_hook_lineage_collector() # For the case that airflow has not yet upgraded to 2.10 or higher, # but using the providers that already uses `get_hook_lineage_collector` diff --git a/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py b/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py index 0e34419043f28..c53e8b53e92a9 100644 --- a/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py +++ b/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py @@ -19,8 +19,6 @@ from typing import TYPE_CHECKING -from airflow.providers.common.compat.version_compat import AIRFLOW_V_2_10_PLUS - if TYPE_CHECKING: from airflow.providers.standard.operators.python import ( _SERIALIZERS, @@ -38,13 +36,11 @@ ) except ModuleNotFoundError: from airflow.operators.python import ( + _SERIALIZERS, PythonOperator, ShortCircuitOperator, get_current_context, ) - if AIRFLOW_V_2_10_PLUS: - from airflow.operators.python import _SERIALIZERS - __all__ = ["PythonOperator", "_SERIALIZERS", "ShortCircuitOperator", "get_current_context"] diff --git a/providers/common/compat/src/airflow/providers/common/compat/version_compat.py b/providers/common/compat/src/airflow/providers/common/compat/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/common/compat/src/airflow/providers/common/compat/version_compat.py +++ b/providers/common/compat/src/airflow/providers/common/compat/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/common/io/src/airflow/providers/common/io/version_compat.py b/providers/common/io/src/airflow/providers/common/io/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/common/io/src/airflow/providers/common/io/version_compat.py +++ b/providers/common/io/src/airflow/providers/common/io/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/databricks/src/airflow/providers/databricks/version_compat.py b/providers/databricks/src/airflow/providers/databricks/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/databricks/src/airflow/providers/databricks/version_compat.py +++ b/providers/databricks/src/airflow/providers/databricks/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py index 983ee48e63c99..b9e56f30f3faf 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py @@ -22,7 +22,7 @@ from typing import TYPE_CHECKING from airflow.providers.common.compat.openlineage.check import require_openlineage_version -from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance @@ -48,13 +48,6 @@ def _get_logical_date(task_instance): return date -def _get_try_number(val): - # todo: remove when min airflow version >= 2.10.0 - if AIRFLOW_V_2_10_PLUS: - return val.try_number - return val.try_number - 1 - - @require_openlineage_version(provider_min_version="2.0.0") def generate_openlineage_events_from_dbt_cloud_run( operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor, task_instance: TaskInstance @@ -144,7 +137,7 @@ async def get_artifacts_for_steps(steps, artifacts): dag_id=task_instance.dag_id, task_id=operator.task_id, logical_date=_get_logical_date(task_instance), - try_number=_get_try_number(task_instance), + try_number=task_instance.try_number, map_index=task_instance.map_index, ) diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/docker/src/airflow/providers/docker/version_compat.py b/providers/docker/src/airflow/providers/docker/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/docker/src/airflow/providers/docker/version_compat.py +++ b/providers/docker/src/airflow/providers/docker/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/edge3/src/airflow/providers/edge3/version_compat.py b/providers/edge3/src/airflow/providers/edge3/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/edge3/src/airflow/providers/edge3/version_compat.py +++ b/providers/edge3/src/airflow/providers/edge3/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/google/src/airflow/providers/google/version_compat.py b/providers/google/src/airflow/providers/google/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/google/src/airflow/providers/google/version_compat.py +++ b/providers/google/src/airflow/providers/google/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py index 45eb7cf2d68a5..d7febaada8618 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py @@ -52,8 +52,6 @@ split_tablename, ) -from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS - pytestmark = pytest.mark.filterwarnings("error::airflow.exceptions.AirflowProviderDeprecationWarning") PROJECT_ID = "bq-project" @@ -1933,7 +1931,6 @@ def test_get_records_as_dict(self): assert result == [{"f0_": 22, "f1_": 3.14, "f2_": "PI"}] -@pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @pytest.mark.db_test class TestHookLevelLineage(_BigQueryBaseTestClass): @mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client") diff --git a/providers/google/tests/unit/google/cloud/hooks/test_gcs.py b/providers/google/tests/unit/google/cloud/hooks/test_gcs.py index 5156ca6d39500..326209ee1d090 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_gcs.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_gcs.py @@ -43,7 +43,6 @@ from airflow.utils import timezone from airflow.version import version -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS from unit.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}" @@ -411,7 +410,6 @@ def test_copy_empty_source_object(self): assert str(ctx.value) == "source_bucket and source_object cannot be empty." - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @mock.patch("google.cloud.storage.Bucket.copy_blob") @mock.patch(GCS_STRING.format("GCSHook.get_conn")) def test_copy_exposes_lineage(self, mock_service, mock_copy, hook_lineage_collector): @@ -507,7 +505,6 @@ def test_rewrite_empty_source_object(self): assert str(ctx.value) == "source_bucket and source_object cannot be empty." - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @mock.patch(GCS_STRING.format("GCSHook.get_conn")) def test_rewrite_exposes_lineage(self, mock_service, hook_lineage_collector): source_bucket_name = "test-source-bucket" @@ -567,7 +564,6 @@ def test_delete_nonexisting_object(self, mock_service): with pytest.raises(exceptions.NotFound): self.gcs_hook.delete(bucket_name=test_bucket, object_name=test_object) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @mock.patch(GCS_STRING.format("GCSHook.get_conn")) def test_delete_exposes_lineage(self, mock_service, hook_lineage_collector): test_bucket = "test_bucket" @@ -818,7 +814,6 @@ def test_compose_without_destination_object(self, mock_service): assert str(ctx.value) == "bucket_name and destination_object cannot be empty." - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @mock.patch(GCS_STRING.format("GCSHook.get_conn")) def test_compose_exposes_lineage(self, mock_service, hook_lineage_collector): test_bucket = "test_bucket" @@ -859,7 +854,6 @@ def test_download_as_bytes(self, mock_service): assert response == test_object_bytes download_method.assert_called_once_with() - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @mock.patch("google.cloud.storage.Blob.download_as_bytes") @mock.patch(GCS_STRING.format("GCSHook.get_conn")) def test_download_as_bytes_exposes_lineage(self, mock_service, mock_download, hook_lineage_collector): @@ -899,7 +893,6 @@ def test_download_to_file(self, mock_service): assert response == test_file download_filename_method.assert_called_once_with(test_file, timeout=60) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @mock.patch("google.cloud.storage.Blob.download_to_filename") @mock.patch(GCS_STRING.format("GCSHook.get_conn")) def test_download_to_file_exposes_lineage(self, mock_service, mock_download, hook_lineage_collector): @@ -1154,7 +1147,6 @@ def test_upload_file(self, mock_service, testdata_file): assert metadata == blob_object.return_value.metadata - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @mock.patch("google.cloud.storage.Blob.upload_from_filename") @mock.patch(GCS_STRING.format("GCSHook.get_conn")) def test_upload_file_exposes_lineage(self, mock_service, mock_upload, hook_lineage_collector): @@ -1218,7 +1210,6 @@ def test_upload_data_bytes(self, mock_service, testdata_bytes): upload_method.assert_called_once_with(testdata_bytes, content_type="text/plain", timeout=60) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") @mock.patch("google.cloud.storage.Blob.upload_from_string") @mock.patch(GCS_STRING.format("GCSHook.get_conn")) def test_upload_data_exposes_lineage(self, mock_service, mock_upload, hook_lineage_collector): diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py index e33c71f39f10a..6ae597ba0e31c 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py @@ -33,7 +33,6 @@ from tests_common.test_utils.file_loading import load_file_from_resources, load_json_from_resources from tests_common.test_utils.mock_context import mock_context from tests_common.test_utils.operators.run_deferrable import execute_operator -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS from unit.microsoft.azure.base import Base from unit.microsoft.azure.test_utils import mock_json_response, mock_response @@ -261,7 +260,6 @@ def test_execute_when_response_is_bytes(self): assert events[0].payload["response"] == base64_encoded_content @pytest.mark.db_test - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Lambda parameters works in Airflow >= 2.10.0") def test_execute_with_lambda_parameter_when_response_is_bytes(self): content = load_file_from_resources( dirname(__file__), "..", "resources", "dummy.pdf", mode="rb", encoding=None diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py b/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py index c73c76caffe79..055165d4fc70a 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py @@ -28,7 +28,6 @@ from tests_common.test_utils.file_loading import load_json_from_resources from tests_common.test_utils.operators.run_deferrable import execute_operator -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS from unit.microsoft.azure.base import Base from unit.microsoft.azure.test_utils import mock_json_response @@ -102,7 +101,6 @@ def test_execute_with_result_processor_with_new_signature(self): assert events[2].payload["type"] == "builtins.dict" assert events[2].payload["response"] == json.dumps(status[1]) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Lambda parameters works in Airflow >= 2.10.0") def test_execute_with_lambda_parameter_and_result_processor_with_new_signature(self): status = load_json_from_resources(dirname(__file__), "..", "resources", "status.json") response = mock_json_response(200, *status) diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 545ab651ad388..11228c0d10fcf 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -33,7 +33,6 @@ from airflow.providers.openlineage.extractors import ExtractorManager, OperatorLineage from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState from airflow.providers.openlineage.utils.utils import ( - AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS, get_airflow_dag_run_facet, get_airflow_debug_facet, @@ -59,13 +58,6 @@ _openlineage_listener: OpenLineageListener | None = None -def _get_try_number_success(val): - # todo: remove when min airflow version >= 2.10.0 - if AIRFLOW_V_2_10_PLUS: - return val.try_number - return val.try_number - 1 - - def _executor_initializer(): """ Initialize processes for the executor used with DAGRun listener's methods (on scheduler). @@ -304,7 +296,7 @@ def on_success(): task_uuid = self.adapter.build_task_instance_run_id( dag_id=dag.dag_id, task_id=task.task_id, - try_number=_get_try_number_success(task_instance), + try_number=task_instance.try_number, logical_date=date, map_index=task_instance.map_index, ) @@ -366,8 +358,7 @@ def on_task_instance_failed( dagrun = context["dag_run"] dag = context["dag"] self._on_task_instance_failed(task_instance, dag, dagrun, task, error) - - elif AIRFLOW_V_2_10_PLUS: + else: @hookimpl def on_task_instance_failed( @@ -382,19 +373,6 @@ def on_task_instance_failed( if TYPE_CHECKING: assert task self._on_task_instance_failed(task_instance, task.dag, task_instance.dag_run, task, error) - else: - - @hookimpl - def on_task_instance_failed( - self, - previous_state: TaskInstanceState, - task_instance: TaskInstance, - session: Session, # type: ignore[valid-type] - ) -> None: - task = task_instance.task - if TYPE_CHECKING: - assert task - self._on_task_instance_failed(task_instance, task.dag, task_instance.dag_run, task) def _on_task_instance_failed( self, @@ -651,10 +629,7 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None: self.log.debug("Executor have not started before `on_dag_run_success`") return - if AIRFLOW_V_2_10_PLUS: - task_ids = DagRun._get_partial_task_ids(dag_run.dag) - else: - task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None + task_ids = DagRun._get_partial_task_ids(dag_run.dag) date = dag_run.logical_date if AIRFLOW_V_3_0_PLUS and date is None: @@ -690,10 +665,7 @@ def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None: self.log.debug("Executor have not started before `on_dag_run_failed`") return - if AIRFLOW_V_2_10_PLUS: - task_ids = DagRun._get_partial_task_ids(dag_run.dag) - else: - task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None + task_ids = DagRun._get_partial_task_ids(dag_run.dag) date = dag_run.logical_date if AIRFLOW_V_3_0_PLUS and date is None: diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py index 36c2665754693..b1f465a940961 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py @@ -25,7 +25,6 @@ lineage_parent_id, lineage_run_id, ) -from airflow.providers.openlineage.version_compat import AIRFLOW_V_2_10_PLUS class OpenLineageProviderPlugin(AirflowPlugin): @@ -40,10 +39,9 @@ class OpenLineageProviderPlugin(AirflowPlugin): if not conf.is_disabled(): macros = [lineage_job_namespace, lineage_job_name, lineage_run_id, lineage_parent_id] listeners = [get_openlineage_listener()] - if AIRFLOW_V_2_10_PLUS: - from airflow.lineage.hook import HookLineageReader + from airflow.lineage.hook import HookLineageReader - hook_lineage_readers = [HookLineageReader] + hook_lineage_readers = [HookLineageReader] else: macros = [] listeners = [] diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py index 454b164d0f9ff..2567cb418027b 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py @@ -50,7 +50,7 @@ is_dag_lineage_enabled, is_task_lineage_enabled, ) -from airflow.providers.openlineage.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS from airflow.sensors.base import BaseSensorOperator from airflow.serialization.serialized_objects import SerializedBaseOperator from airflow.utils.module_loading import import_string @@ -803,12 +803,6 @@ def get_filtered_unknown_operator_keys(operator: BaseOperator) -> dict: def should_use_external_connection(hook) -> bool: # If we're at Airflow 2.10, the execution is process-isolated, so we can safely run those again. - if not AIRFLOW_V_2_10_PLUS: - return hook.__class__.__name__ not in [ - "SnowflakeHook", - "SnowflakeSqlApiHook", - "RedshiftSQLHook", - ] return True diff --git a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py +++ b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py index 73f2c0d7b3db6..55e11588343d7 100644 --- a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py +++ b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py @@ -38,7 +38,7 @@ from tests_common.test_utils.compat import DateTimeSensor, PythonOperator from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: try: @@ -53,21 +53,19 @@ AssetEventDagRunReference = TIRunContext = Any # type: ignore[misc, assignment] -if AIRFLOW_V_2_10_PLUS: - - @pytest.fixture - def hook_lineage_collector(): - from airflow.lineage import hook - from airflow.providers.common.compat.lineage.hook import ( - get_hook_lineage_collector, - ) +@pytest.fixture +def hook_lineage_collector(): + from airflow.lineage import hook + from airflow.providers.common.compat.lineage.hook import ( + get_hook_lineage_collector, + ) - hook._hook_lineage_collector = None - hook._hook_lineage_collector = hook.HookLineageCollector() + hook._hook_lineage_collector = None + hook._hook_lineage_collector = hook.HookLineageCollector() - yield get_hook_lineage_collector() + yield get_hook_lineage_collector() - hook._hook_lineage_collector = None + hook._hook_lineage_collector = None if AIRFLOW_V_3_0_PLUS: @@ -281,7 +279,6 @@ def test_convert_to_ol_dataset_table(): @skip_if_force_lowest_dependencies_marker -@pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") def test_extractor_manager_uses_hook_level_lineage(hook_lineage_collector): dagrun = MagicMock() task = MagicMock() @@ -300,7 +297,6 @@ def test_extractor_manager_uses_hook_level_lineage(hook_lineage_collector): assert metadata.outputs == [OpenLineageDataset(namespace="s3://bucket", name="output_key")] -@pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in Airflow >= 2.10.0") def test_extractor_manager_does_not_use_hook_level_lineage_when_operator( hook_lineage_collector, ): @@ -330,8 +326,8 @@ def get_openlineage_facets_on_start(self): @pytest.mark.db_test @pytest.mark.skipif( - not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS, - reason="Test for hook level lineage in Airflow >= 2.10.0 < 3.0", + AIRFLOW_V_3_0_PLUS, + reason="Test for hook level lineage in Airflow < 3.0", ) def test_extractor_manager_gets_data_from_pythonoperator(session, dag_maker, hook_lineage_collector): path = None diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py index 9f12994e72842..88de93e6e2877 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py @@ -37,7 +37,7 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_runs -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: from airflow.utils.types import DagRunTriggeredByType @@ -68,9 +68,7 @@ def has_value_in_events(events, chain, value): with tempfile.TemporaryDirectory(prefix="venv") as tmp_dir: listener_path = Path(tmp_dir) / "event" - @pytest.mark.skipif( - not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow>=2.10<3.0" - ) + @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow<3.0") @pytest.mark.usefixtures("reset_logging_config") class TestOpenLineageExecution: def teardown_method(self): diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py index 7ece6038c115b..3dc2c938835af 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py @@ -44,18 +44,17 @@ from tests_common.test_utils.compat import EmptyOperator, PythonOperator from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_runs -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test -EXPECTED_TRY_NUMBER_1 = 1 if AIRFLOW_V_2_10_PLUS else 0 - -TRY_NUMBER_BEFORE_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 1 -TRY_NUMBER_RUNNING = 0 if AIRFLOW_V_2_10_PLUS else 1 -TRY_NUMBER_FAILED = 0 if AIRFLOW_V_2_10_PLUS else 1 -TRY_NUMBER_SUCCESS = 0 if AIRFLOW_V_2_10_PLUS else 2 -TRY_NUMBER_AFTER_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 2 +EXPECTED_TRY_NUMBER_1 = 1 +TRY_NUMBER_BEFORE_EXECUTION = 0 +TRY_NUMBER_RUNNING = 0 +TRY_NUMBER_FAILED = 0 +TRY_NUMBER_SUCCESS = 0 +TRY_NUMBER_AFTER_EXECUTION = 0 if TYPE_CHECKING: from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance @@ -356,8 +355,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments( mock_disabled.return_value = False err = ValueError("test") - on_task_failed_listener_kwargs = {"error": err} if AIRFLOW_V_2_10_PLUS else {} - expected_err_kwargs = {"error": err if AIRFLOW_V_2_10_PLUS else None} + on_task_failed_listener_kwargs = {"error": err} + expected_err_kwargs = {"error": err} listener.on_task_instance_failed( previous_state=None, task_instance=task_instance, **on_task_failed_listener_kwargs, session=None @@ -460,7 +459,7 @@ def test_on_task_instance_failed_correctly_calls_openlineage_adapter_run_id_meth parameters derived from the task instance. """ listener, task_instance = self._create_listener_and_task_instance() - on_task_failed_kwargs = {"error": ValueError("test")} if AIRFLOW_V_2_10_PLUS else {} + on_task_failed_kwargs = {"error": ValueError("test")} listener.on_task_instance_failed( previous_state=None, task_instance=task_instance, **on_task_failed_kwargs, session=None @@ -582,7 +581,7 @@ def test_listener_on_task_instance_failed_do_not_call_adapter_when_disabled_oper mock_get_user_provided_run_facets.return_value = {"custom_facet": 2} mock_disabled.return_value = True - on_task_failed_kwargs = {"error": ValueError("test")} if AIRFLOW_V_2_10_PLUS else {} + on_task_failed_kwargs = {"error": ValueError("test")} listener.on_task_instance_failed( previous_state=None, task_instance=task_instance, **on_task_failed_kwargs, session=None @@ -1011,8 +1010,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments( mock_disabled.return_value = False err = ValueError("test") - on_task_failed_listener_kwargs = {"error": err} if AIRFLOW_V_2_10_PLUS else {} - expected_err_kwargs = {"error": err if AIRFLOW_V_2_10_PLUS else None} + on_task_failed_listener_kwargs = {"error": err} + expected_err_kwargs = {"error": err} listener.on_task_instance_failed( previous_state=None, task_instance=task_instance, **on_task_failed_listener_kwargs @@ -1054,8 +1053,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_ mock_get_job_name.return_value = "job_name" err = ValueError("test") - on_task_failed_listener_kwargs = {"error": err} if AIRFLOW_V_2_10_PLUS else {} - expected_err_kwargs = {"error": err if AIRFLOW_V_2_10_PLUS else None} + on_task_failed_listener_kwargs = {"error": err} + expected_err_kwargs = {"error": err} listener.on_task_instance_failed( previous_state=None, task_instance=task_instance, **on_task_failed_listener_kwargs @@ -1185,7 +1184,7 @@ def test_on_task_instance_failed_correctly_calls_openlineage_adapter_run_id_meth parameters derived from the task instance. """ listener, task_instance = self._create_listener_and_task_instance() - on_task_failed_kwargs = {"error": ValueError("test")} if AIRFLOW_V_2_10_PLUS else {} + on_task_failed_kwargs = {"error": ValueError("test")} listener.on_task_instance_failed( previous_state=None, task_instance=task_instance, **on_task_failed_kwargs @@ -1248,7 +1247,7 @@ def test_listener_on_task_instance_failed_do_not_call_adapter_when_disabled_oper mock_get_user_provided_run_facets.return_value = {"custom_facet": 2} mock_disabled.return_value = True - on_task_failed_kwargs = {"error": ValueError("test")} if AIRFLOW_V_2_10_PLUS else {} + on_task_failed_kwargs = {"error": ValueError("test")} listener.on_task_instance_failed( previous_state=None, task_instance=task_instance, **on_task_failed_kwargs @@ -1427,7 +1426,7 @@ def test_listener_with_task_enabled( if enable_task: enable_lineage(self.task_1) - on_task_failed_kwargs = {"error": ValueError("test")} if AIRFLOW_V_2_10_PLUS else {} + on_task_failed_kwargs = {"error": ValueError("test")} with conf_vars({("openlineage", "selective_enable"): selective_enable}): listener = OpenLineageListener() @@ -1485,7 +1484,7 @@ def test_listener_with_dag_disabled_task_enabled( if enable_task: enable_lineage(self.task_1) - on_task_failed_kwargs = {"error": ValueError("test")} if AIRFLOW_V_2_10_PLUS else {} + on_task_failed_kwargs = {"error": ValueError("test")} with conf_vars({("openlineage", "selective_enable"): selective_enable}): listener = OpenLineageListener() diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py b/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py index 4d7944a0ad640..ffbb10c8c48fe 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py @@ -51,7 +51,7 @@ from tests_common.test_utils.compat import ( BashOperator, ) -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: from airflow.utils.types import DagRunTriggeredByType @@ -562,7 +562,7 @@ def test_serialize_timetable_with_dataset_or_time_schedule(): @pytest.mark.skipif( - not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_0_PLUS, reason="This test checks serialization only in 2.10 conditions", ) def test_serialize_timetable_2_10_complex_with_alias(): @@ -600,7 +600,7 @@ def test_serialize_timetable_2_10_complex_with_alias(): @pytest.mark.skipif( - not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_0_PLUS, reason="This test checks serialization only in 2.10 conditions", ) def test_serialize_timetable_2_10_single_asset(): @@ -612,7 +612,7 @@ def test_serialize_timetable_2_10_single_asset(): @pytest.mark.skipif( - not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_0_PLUS, reason="This test checks serialization only in 2.10 conditions", ) def test_serialize_timetable_2_10_list_of_assets(): @@ -630,7 +630,7 @@ def test_serialize_timetable_2_10_list_of_assets(): @pytest.mark.skipif( - not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_0_PLUS, reason="This test checks serialization only in 2.10 conditions", ) def test_serialize_timetable_2_10_with_complex_logical_condition(): @@ -665,7 +665,7 @@ def test_serialize_timetable_2_10_with_complex_logical_condition(): @pytest.mark.skipif( - not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_0_PLUS, reason="This test checks serialization only in 2.10 conditions", ) def test_serialize_timetable_2_10_with_dataset_or_time_schedule(): @@ -709,102 +709,6 @@ def test_serialize_timetable_2_10_with_dataset_or_time_schedule(): } -@pytest.mark.skipif(AIRFLOW_V_2_10_PLUS, reason="This test checks serialization only in 2.9 conditions") -def test_serialize_timetable_2_9_single_asset(): - dag = DAG(dag_id="test", start_date=datetime.datetime(2025, 1, 1), schedule=Asset("a")) - dag_info = DagInfo(dag) - assert dag_info.timetable == {"dataset_condition": {"__type": "dataset", "uri": "a", "extra": None}} - - -@pytest.mark.skipif(AIRFLOW_V_2_10_PLUS, reason="This test checks serialization only in 2.9 conditions") -def test_serialize_timetable_2_9_list_of_assets(): - dag = DAG(dag_id="test", start_date=datetime.datetime(2025, 1, 1), schedule=[Asset("a"), Asset("b")]) - dag_info = DagInfo(dag) - assert dag_info.timetable == { - "dataset_condition": { - "__type": "dataset_all", - "objects": [ - {"__type": "dataset", "extra": None, "uri": "a"}, - {"__type": "dataset", "extra": None, "uri": "b"}, - ], - } - } - - -@pytest.mark.skipif(AIRFLOW_V_2_10_PLUS, reason="This test checks serialization only in 2.9 conditions") -def test_serialize_timetable_2_9_with_complex_logical_condition(): - dag = DAG( - dag_id="test", - start_date=datetime.datetime(2025, 1, 1), - schedule=(Asset("ds1", extra={"some_extra": 1}) | Asset("ds2")) - & (Asset("ds3") | Asset("ds4", extra={"another_extra": 345})), - ) - dag_info = DagInfo(dag) - assert dag_info.timetable == { - "dataset_condition": { - "__type": "dataset_all", - "objects": [ - { - "__type": "dataset_any", - "objects": [ - {"__type": "dataset", "uri": "ds1", "extra": {"some_extra": 1}}, - {"__type": "dataset", "uri": "ds2", "extra": None}, - ], - }, - { - "__type": "dataset_any", - "objects": [ - {"__type": "dataset", "uri": "ds3", "extra": None}, - {"__type": "dataset", "uri": "ds4", "extra": {"another_extra": 345}}, - ], - }, - ], - } - } - - -@pytest.mark.skipif(AIRFLOW_V_2_10_PLUS, reason="This test checks serialization only in 2.9 conditions") -def test_serialize_timetable_2_9_with_dataset_or_time_schedule(): - from airflow.timetables.datasets import DatasetOrTimeSchedule - from airflow.timetables.trigger import CronTriggerTimetable - - dag = DAG( - dag_id="test", - start_date=datetime.datetime(2025, 1, 1), - schedule=DatasetOrTimeSchedule( - timetable=CronTriggerTimetable("0 0 * 3 *", timezone="UTC"), - datasets=(Asset("ds1", extra={"some_extra": 1}) | Asset("ds2")) - & (Asset("ds3") | Asset("ds4", extra={"another_extra": 345})), - ), - ) - dag_info = DagInfo(dag) - assert dag_info.timetable == { - "timetable": { - "__type": "airflow.timetables.trigger.CronTriggerTimetable", - "__var": {"expression": "0 0 * 3 *", "timezone": "UTC", "interval": 0.0}, - }, - "dataset_condition": { - "__type": "dataset_all", - "objects": [ - { - "__type": "dataset_any", - "objects": [ - {"__type": "dataset", "uri": "ds1", "extra": {"some_extra": 1}}, - {"__type": "dataset", "uri": "ds2", "extra": None}, - ], - }, - { - "__type": "dataset_any", - "objects": [ - {"__type": "dataset", "uri": "ds3", "extra": None}, - {"__type": "dataset", "uri": "ds4", "extra": {"another_extra": 345}}, - ], - }, - ], - }, - } - - @pytest.mark.parametrize( ("airflow_version", "ol_version"), [ diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py index 07b1a90ff9a6a..27b64daa63031 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py @@ -64,7 +64,7 @@ from tests_common.test_utils.compat import BashOperator, PythonOperator from tests_common.test_utils.mock_operators import MockOperator -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash" PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python" @@ -1026,7 +1026,7 @@ def test_dag_info_schedule_dataset_or_time_schedule(self): } -@pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS, reason="Airflow 2.10 tests") +@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow < 3.0 tests") class TestDagInfoAirflow210: def test_dag_info_schedule_single_dataset_directly(self): dag = DAG( diff --git a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py +++ b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/presto/src/airflow/providers/presto/version_compat.py b/providers/presto/src/airflow/providers/presto/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/presto/src/airflow/providers/presto/version_compat.py +++ b/providers/presto/src/airflow/providers/presto/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/redis/src/airflow/providers/redis/version_compat.py b/providers/redis/src/airflow/providers/redis/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/redis/src/airflow/providers/redis/version_compat.py +++ b/providers/redis/src/airflow/providers/redis/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/sftp/src/airflow/providers/sftp/version_compat.py b/providers/sftp/src/airflow/providers/sftp/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/sftp/src/airflow/providers/sftp/version_compat.py +++ b/providers/sftp/src/airflow/providers/sftp/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/smtp/tests/unit/smtp/notifications/test_smtp.py b/providers/smtp/tests/unit/smtp/notifications/test_smtp.py index bacabf87b4d35..ff3c02c4da956 100644 --- a/providers/smtp/tests/unit/smtp/notifications/test_smtp.py +++ b/providers/smtp/tests/unit/smtp/notifications/test_smtp.py @@ -30,14 +30,12 @@ from airflow.providers.standard.operators.empty import EmptyOperator from airflow.utils import timezone -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS - pytestmark = pytest.mark.db_test SMTP_API_DEFAULT_CONN_ID = SmtpHook.default_conn_name -NUM_TRY = 0 if AIRFLOW_V_2_10_PLUS else 1 +NUM_TRY = 0 class TestSmtpNotifier: diff --git a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py index 1a3d3f018363f..14d682c50cb9b 100644 --- a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py +++ b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py @@ -23,9 +23,8 @@ from urllib.parse import quote, urlparse, urlunparse from airflow.providers.common.compat.openlineage.check import require_openlineage_version -from airflow.providers.snowflake.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from airflow.providers.snowflake.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils import timezone -from airflow.utils.state import TaskInstanceState if TYPE_CHECKING: from openlineage.client.event_v2 import RunEvent @@ -122,21 +121,12 @@ def _get_logical_date(): return date - def _get_try_number_success(): - """We are running this in the _on_complete, so need to adjust for try_num changes.""" - # todo: remove when min airflow version >= 2.10.0 - if AIRFLOW_V_2_10_PLUS: - return task_instance.try_number - if task_instance.state == TaskInstanceState.SUCCESS: - return task_instance.try_number - 1 - return task_instance.try_number - # Generate same OL run id as is generated for current task instance return OpenLineageAdapter.build_task_instance_run_id( dag_id=task_instance.dag_id, task_id=task_instance.task_id, logical_date=_get_logical_date(), - try_number=_get_try_number_success(), + try_number=task_instance.try_number, map_index=task_instance.map_index, ) diff --git a/providers/snowflake/src/airflow/providers/snowflake/version_compat.py b/providers/snowflake/src/airflow/providers/snowflake/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/snowflake/src/airflow/providers/snowflake/version_compat.py +++ b/providers/snowflake/src/airflow/providers/snowflake/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py index 1ecaf75af1804..a6c94a7a383ca 100644 --- a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py +++ b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py @@ -44,8 +44,6 @@ from airflow.utils import timezone from airflow.utils.state import TaskInstanceState -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS - @pytest.mark.parametrize( "source,target", @@ -118,7 +116,7 @@ def test_get_ol_run_id_ti_success(): dag_id="dag_id", task_id="task_id", map_index=1, - try_number=1 if AIRFLOW_V_2_10_PLUS else 2, + try_number=1, logical_date=logical_date, state=TaskInstanceState.SUCCESS, ) @@ -150,7 +148,7 @@ def test_get_parent_run_facet(): dag_id="dag_id", task_id="task_id", map_index=1, - try_number=1 if AIRFLOW_V_2_10_PLUS else 2, + try_number=1, logical_date=logical_date, state=TaskInstanceState.SUCCESS, ) @@ -553,7 +551,7 @@ def test_emit_openlineage_events_for_snowflake_queries_without_hook(mock_now, mo dag_id="dag_id", task_id="task_id", map_index=1, - try_number=1 if AIRFLOW_V_2_10_PLUS else 2, + try_number=1, logical_date=logical_date, state=TaskInstanceState.SUCCESS, # This will be query default state if no metadata found ) diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 1f6759ef8b89f..2f0d288258f65 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -44,12 +44,12 @@ from airflow.models.baseoperator import BaseOperator from airflow.models.variable import Variable from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script -from airflow.providers.standard.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils import hashlib_wrapper from airflow.utils.context import context_copy_partial, context_merge from airflow.utils.file import get_unique_dag_module_name from airflow.utils.operator_helpers import KeywordParameters -from airflow.utils.process_utils import execute_in_subprocess, execute_in_subprocess_with_kwargs +from airflow.utils.process_utils import execute_in_subprocess if AIRFLOW_V_3_0_PLUS: from airflow.providers.standard.operators.branch import BranchMixIn @@ -200,12 +200,10 @@ def __prepare_execution() -> tuple[ExecutionCallableRunner, OutletEventAccessors from airflow.sdk.execution_time.context import context_get_outlet_events return create_executable_runner, context_get_outlet_events(context) - if AIRFLOW_V_2_10_PLUS: - from airflow.utils.context import context_get_outlet_events # type: ignore - from airflow.utils.operator_helpers import ExecutionCallableRunner # type: ignore + from airflow.utils.context import context_get_outlet_events # type: ignore + from airflow.utils.operator_helpers import ExecutionCallableRunner # type: ignore - return ExecutionCallableRunner, context_get_outlet_events(context) - return None + return ExecutionCallableRunner, context_get_outlet_events(context) self.__prepare_execution = __prepare_execution @@ -560,16 +558,10 @@ def _execute_python_callable_in_subprocess(self, python_path: Path): os.fspath(termination_log_path), os.fspath(airflow_context_path), ] - if AIRFLOW_V_2_10_PLUS: - execute_in_subprocess( - cmd=cmd, - env=env_vars, - ) - else: - execute_in_subprocess_with_kwargs( - cmd=cmd, - env=env_vars, - ) + execute_in_subprocess( + cmd=cmd, + env=env_vars, + ) except subprocess.CalledProcessError as e: if e.returncode in self.skip_on_exit_code: raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.") diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py b/providers/standard/src/airflow/providers/standard/sensors/time.py index ee9dde773fee2..bbe93dbc01a5e 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time.py @@ -22,7 +22,6 @@ from typing import TYPE_CHECKING, Any, NoReturn from airflow.providers.standard.triggers.temporal import DateTimeTrigger -from airflow.providers.standard.version_compat import AIRFLOW_V_2_10_PLUS from airflow.sensors.base import BaseSensorOperator try: @@ -123,9 +122,7 @@ def __init__( def execute(self, context: Context) -> NoReturn: self.defer( - trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger) - if AIRFLOW_V_2_10_PLUS - else DateTimeTrigger(moment=self.target_datetime), + trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger), method_name="execute_complete", ) diff --git a/providers/standard/src/airflow/providers/standard/triggers/temporal.py b/providers/standard/src/airflow/providers/standard/triggers/temporal.py index 48ebb223cb6ca..5f7451a0e84f9 100644 --- a/providers/standard/src/airflow/providers/standard/triggers/temporal.py +++ b/providers/standard/src/airflow/providers/standard/triggers/temporal.py @@ -23,14 +23,9 @@ import pendulum -from airflow.exceptions import AirflowException -from airflow.providers.standard.version_compat import AIRFLOW_V_2_10_PLUS -from airflow.triggers.base import BaseTrigger, TriggerEvent +from airflow.triggers.base import BaseTrigger, TaskSuccessEvent, TriggerEvent from airflow.utils import timezone -if AIRFLOW_V_2_10_PLUS: - from airflow.triggers.base import TaskSuccessEvent - class DateTimeTrigger(BaseTrigger): """ @@ -54,9 +49,6 @@ def __init__(self, moment: datetime.datetime, *, end_from_trigger: bool = False) if moment.tzinfo is None: raise ValueError("You cannot pass naive datetimes") self.moment: pendulum.DateTime = timezone.convert_to_utc(moment) - if not AIRFLOW_V_2_10_PLUS and end_from_trigger: - raise AirflowException("end_from_trigger is only supported in Airflow 2.10 and later. ") - self.end_from_trigger = end_from_trigger def serialize(self) -> tuple[str, dict[str, Any]]: diff --git a/providers/standard/src/airflow/providers/standard/version_compat.py b/providers/standard/src/airflow/providers/standard/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/standard/src/airflow/providers/standard/version_compat.py +++ b/providers/standard/src/airflow/providers/standard/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/standard/tests/unit/standard/decorators/test_external_python.py b/providers/standard/tests/unit/standard/decorators/test_external_python.py index ef2007048f577..1b4bba68c2430 100644 --- a/providers/standard/tests/unit/standard/decorators/test_external_python.py +++ b/providers/standard/tests/unit/standard/decorators/test_external_python.py @@ -29,8 +29,6 @@ from airflow.decorators import setup, task, teardown from airflow.utils import timezone -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS - pytestmark = pytest.mark.db_test @@ -68,7 +66,6 @@ def venv_python_with_cloudpickle_and_dill(tmp_path_factory): class TestExternalPythonDecorator: - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support came in after 2.10") @pytest.mark.parametrize( "serializer", [ @@ -89,7 +86,6 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support came in after 2.10") @pytest.mark.parametrize( "serializer", [ @@ -115,7 +111,6 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support came in after 2.10") @pytest.mark.parametrize( "serializer", [ @@ -147,7 +142,6 @@ def f(): with pytest.raises(CalledProcessError): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support came in after 2.10") @pytest.mark.parametrize( "serializer", [ @@ -170,7 +164,6 @@ def f(a, b, c=False, d=False): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support came in after 2.10") @pytest.mark.parametrize( "serializer", [ @@ -191,7 +184,6 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support came in after 2.10") @pytest.mark.parametrize( "serializer", [ @@ -212,7 +204,6 @@ def f(_): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support came in after 2.10") @pytest.mark.parametrize( "serializer", [ @@ -239,7 +230,6 @@ def f(): assert setup_task.is_setup ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support came in after 2.10") @pytest.mark.parametrize( "serializer", [ @@ -266,7 +256,6 @@ def f(): assert teardown_task.is_teardown ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support came in after 2.10") @pytest.mark.parametrize( "serializer", [ diff --git a/providers/standard/tests/unit/standard/operators/test_python.py b/providers/standard/tests/unit/standard/operators/test_python.py index 4206db5481baa..45a1e395e1a16 100644 --- a/providers/standard/tests/unit/standard/operators/test_python.py +++ b/providers/standard/tests/unit/standard/operators/test_python.py @@ -70,7 +70,7 @@ from airflow.utils.types import NOTSET, DagRunType from tests_common.test_utils.db import clear_db_runs -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models.dagrun import DagRun @@ -936,11 +936,10 @@ def test_virtualenv_serializable_context_fields(self, create_task_instance): "conn", # Accessor for Connection. "map_index_template", } - if AIRFLOW_V_2_10_PLUS: - intentionally_excluded_context_keys |= { - "inlet_events", - "outlet_events", - } + intentionally_excluded_context_keys |= { + "inlet_events", + "outlet_events", + } ti = create_task_instance(dag_id=self.dag_id, task_id=self.task_id, schedule=None) context = ti.get_template_context() diff --git a/providers/standard/tests/unit/standard/triggers/test_temporal.py b/providers/standard/tests/unit/standard/triggers/test_temporal.py index 91e27298c5067..fc85eab8273dd 100644 --- a/providers/standard/tests/unit/standard/triggers/test_temporal.py +++ b/providers/standard/tests/unit/standard/triggers/test_temporal.py @@ -29,8 +29,6 @@ from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import utcnow -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS - def test_input_validation(): """ @@ -76,7 +74,6 @@ def test_timedelta_trigger_serialization(): assert -2 < (kwargs["moment"] - expected_moment).total_seconds() < 2 -@pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Only for Airflow 2.10+") @pytest.mark.parametrize( "tz, end_from_trigger", [ @@ -116,44 +113,6 @@ async def test_datetime_trigger_timing_airflow_2_10_plus(tz, end_from_trigger): assert result.payload == expected_payload -@pytest.mark.skipif(AIRFLOW_V_2_10_PLUS, reason="Only for Airflow < 2.10+") -@pytest.mark.parametrize( - "tz", - [ - timezone.parse_timezone("UTC"), - timezone.parse_timezone("Europe/Paris"), - timezone.parse_timezone("America/Toronto"), - ], -) -@pytest.mark.asyncio -async def test_datetime_trigger_timing(tz): - """ - Tests that the DateTimeTrigger only goes off on or after the appropriate - time. - """ - past_moment = pendulum.instance((timezone.utcnow() - datetime.timedelta(seconds=60)).astimezone(tz)) - future_moment = pendulum.instance((timezone.utcnow() + datetime.timedelta(seconds=60)).astimezone(tz)) - - # Create a task that runs the trigger for a short time then cancels it - trigger = DateTimeTrigger(future_moment) - trigger_task = asyncio.create_task(trigger.run().__anext__()) - await asyncio.sleep(0.5) - - # It should not have produced a result - assert trigger_task.done() is False - trigger_task.cancel() - - # Now, make one waiting for en event in the past and do it again - trigger = DateTimeTrigger(past_moment) - trigger_task = asyncio.create_task(trigger.run().__anext__()) - await asyncio.sleep(0.5) - - assert trigger_task.done() is True - result = trigger_task.result() - assert isinstance(result, TriggerEvent) - assert result.payload == past_moment - - @mock.patch("airflow.providers.standard.triggers.temporal.timezone.utcnow") @mock.patch("airflow.providers.standard.triggers.temporal.asyncio.sleep") @pytest.mark.asyncio diff --git a/providers/trino/src/airflow/providers/trino/version_compat.py b/providers/trino/src/airflow/providers/trino/version_compat.py index 21e7170194e36..48d122b669696 100644 --- a/providers/trino/src/airflow/providers/trino/version_compat.py +++ b/providers/trino/src/airflow/providers/trino/version_compat.py @@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)