diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 98e743ae42382..ee93b674ddd97 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -3034,7 +3034,9 @@ def _exceeds_max_active_runs( session=session, ) active_non_backfill_runs = runs_dict.get(dag_model.dag_id, 0) - exceeds = active_non_backfill_runs >= dag_model.max_active_runs + exceeds = ( + dag_model.max_active_runs is not None and active_non_backfill_runs >= dag_model.max_active_runs + ) return exceeds, active_non_backfill_runs diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index 3e5d3a95ec978..942b811ce6509 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -98,7 +98,7 @@ class Trigger(Base): encrypted_kwargs: Mapped[str] = mapped_column("kwargs", Text, nullable=False) created_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime, nullable=False) triggerer_id: Mapped[int | None] = mapped_column(Integer, nullable=True) - queue: Mapped[str] = mapped_column(String(256), nullable=True) + queue: Mapped[str | None] = mapped_column(String(256), nullable=True) triggerer_job = relationship( "Job", diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena_sql.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena_sql.py index e12589a75d12d..ad6c815223b3e 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena_sql.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena_sql.py @@ -25,7 +25,7 @@ try: from sqlalchemy.engine.url import URL except ImportError: - URL = None + URL = None # type: ignore[assignment,misc] from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py index 33f2e4c83b2a8..4fc414cd28159 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py @@ -27,7 +27,7 @@ from sqlalchemy import create_engine from sqlalchemy.engine.url import URL except ImportError: - URL = create_engine = None + URL = create_engine = None # type: ignore[assignment,misc] from airflow.exceptions import AirflowOptionalProviderFeatureException from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook diff --git a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py index 0841030efdfac..8ab5e887c4c78 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py @@ -35,11 +35,11 @@ from sqlalchemy.engine import make_url from sqlalchemy.exc import ArgumentError, NoSuchModuleError except ImportError: - create_engine = None - inspect = None - make_url = None - ArgumentError = Exception - NoSuchModuleError = Exception + create_engine = None # type: ignore[assignment] + inspect = None # type: ignore[assignment] + make_url = None # type: ignore[assignment] + ArgumentError = Exception # type: ignore[misc,assignment] + NoSuchModuleError = Exception # type: ignore[misc,assignment] from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning diff --git a/providers/exasol/src/airflow/providers/exasol/hooks/exasol.py b/providers/exasol/src/airflow/providers/exasol/hooks/exasol.py index cf2aa1629c96c..77ca004880917 100644 --- a/providers/exasol/src/airflow/providers/exasol/hooks/exasol.py +++ b/providers/exasol/src/airflow/providers/exasol/hooks/exasol.py @@ -28,7 +28,7 @@ try: from sqlalchemy.engine import URL except ImportError: - URL = None + URL = None # type: ignore[assignment,misc] from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning from airflow.providers.common.sql.hooks.handlers import return_single_query_results diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py index 1281e95a26019..d226abb8c74ef 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py @@ -208,7 +208,7 @@ def sample_callable(**kwargs): task_instance = create_task_instance( t, run_id=run_id, - dag_version_id=dagrun.created_dag_version_id, + dag_version_id=uuid.UUID(dagrun.created_dag_version_id), ) else: task_instance = TaskInstance(t, run_id=run_id) # type: ignore diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py b/providers/standard/tests/unit/standard/operators/test_hitl.py index 426f61760bef7..97c4ce9160b61 100644 --- a/providers/standard/tests/unit/standard/operators/test_hitl.py +++ b/providers/standard/tests/unit/standard/operators/test_hitl.py @@ -256,7 +256,7 @@ def test_execute(self, dag_maker: DagMaker, session: Session) -> None: expected_params_in_trigger_kwargs: dict[str, dict[str, Any]] # trigger_kwargs are encoded via BaseSerialization in versions < 3.2 - expected_ti_id = ti.id + expected_ti_id: str | UUID = ti.id if AIRFLOW_V_3_2_PLUS: expected_params_in_trigger_kwargs = expected_params # trigger_kwargs are encoded via serde from task sdk in versions >= 3.2 diff --git a/task-sdk/src/airflow/sdk/serde/__init__.py b/task-sdk/src/airflow/sdk/serde/__init__.py index b02ad36313645..5716f2a9c625e 100644 --- a/task-sdk/src/airflow/sdk/serde/__init__.py +++ b/task-sdk/src/airflow/sdk/serde/__init__.py @@ -25,7 +25,7 @@ from fnmatch import fnmatch from importlib import import_module from re import Pattern -from typing import TYPE_CHECKING, Any, TypeVar, cast +from typing import TYPE_CHECKING, Any, TypeVar, cast, overload import attr @@ -85,6 +85,14 @@ def decode(d: dict[str, Any]) -> tuple[str, int, Any]: return classname, version, data +@overload +def serialize(o: dict, depth: int = 0) -> dict: ... +@overload +def serialize(o: None, depth: int = 0) -> None: ... +@overload +def serialize(o: object, depth: int = 0) -> U | None: ... + + def serialize(o: object, depth: int = 0) -> U | None: """ Serialize an object into a representation consisting only built-in types.