Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow-core/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ repos:
^src/airflow/plugins_manager\.py$|
^src/airflow/providers_manager\.py$|
^src/airflow/secrets/__init__.py$|
^src/airflow/serialization/definitions/[_a-z]+\.py$|
^src/airflow/serialization/decoders\.py$|
^src/airflow/serialization/definitions/[_/a-z]+\.py$|
^src/airflow/serialization/encoders\.py$|
^src/airflow/serialization/enums\.py$|
^src/airflow/serialization/helpers\.py$|
^src/airflow/serialization/serialized_objects\.py$|
Expand Down
23 changes: 12 additions & 11 deletions airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,18 @@ def _save_dot_to_file(dot: Dot, filename: str) -> None:
print(f"File {filename} saved")


def _get_dagbag_dag_details(dag: DAG, session: Session) -> dict:
def _get_dagbag_dag_details(dag: DAG) -> dict:
"""Return a dagbag dag details dict."""
dag_model: DagModel | None = session.get(DagModel, dag.dag_id)
from airflow.serialization.encoders import coerce_to_core_timetable

core_timetable = coerce_to_core_timetable(dag.timetable)
return {
"dag_id": dag.dag_id,
"dag_display_name": dag.dag_display_name,
"bundle_name": dag_model.bundle_name if dag_model else None,
"bundle_version": dag_model.bundle_version if dag_model else None,
"is_paused": dag_model.is_paused if dag_model else None,
"is_stale": dag_model.is_stale if dag_model else None,
"bundle_name": None,
"bundle_version": None,
"is_paused": None,
"is_stale": None,
"last_parsed_time": None,
"last_parse_duration": None,
"last_expired": None,
Expand All @@ -255,8 +257,8 @@ def _get_dagbag_dag_details(dag: DAG, session: Session) -> dict:
"file_token": None,
"owners": dag.owner,
"description": dag.description,
"timetable_summary": dag.timetable.summary,
"timetable_description": dag.timetable.description,
"timetable_summary": core_timetable.summary,
"timetable_description": core_timetable.description,
"tags": dag.tags,
"max_active_tasks": dag.max_active_tasks,
"max_active_runs": dag.max_active_runs,
Expand Down Expand Up @@ -401,11 +403,10 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> None:
)

def get_dag_detail(dag: DAG) -> dict:
dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
if dag_model:
if dag_model := DagModel.get_dagmodel(dag.dag_id, session=session):
dag_detail = DAGResponse.model_validate(dag_model, from_attributes=True).model_dump()
else:
dag_detail = _get_dagbag_dag_details(dag, session)
dag_detail = _get_dagbag_dag_details(dag)
if not cols:
return dag_detail
return {col: dag_detail[col] for col in cols if col in DAG_DETAIL_FIELDS}
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ def add_task_asset_references(
def add_asset_trigger_references(
self, assets: dict[tuple[str, str], AssetModel], *, session: Session
) -> None:
from airflow.serialization.serialized_objects import _encode_trigger
from airflow.serialization.encoders import encode_trigger

# Update references from assets being used
refs_to_add: dict[tuple[str, str], set[int]] = {}
Expand All @@ -948,7 +948,7 @@ def add_asset_trigger_references(
# If the asset belong to a DAG not active or paused, consider there is no watcher associated to it
asset_watcher_triggers = (
[
{**_encode_trigger(watcher.trigger), "watcher_name": watcher.name}
{**encode_trigger(watcher.trigger), "watcher_name": watcher.name}
for watcher in asset.watchers
]
if name_uri in active_assets
Expand Down
4 changes: 1 addition & 3 deletions airflow-core/src/airflow/example_dags/example_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@
import pendulum

from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG, Asset
from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.sdk import DAG, Asset, AssetOrTimeSchedule, CronTriggerTimetable

dag1_asset = Asset("s3://dag1/output_1.txt", extra={"hi": "bye"})
dag2_asset = Asset("s3://dag2/output_1.txt", extra={"hi": "bye"})
Expand Down
52 changes: 23 additions & 29 deletions airflow-core/src/airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,25 @@
from airflow.sdk.exceptions import (
AirflowException,
AirflowNotFoundException,
AirflowRescheduleException,
TaskNotFound,
AirflowRescheduleException as AirflowRescheduleException,
AirflowTimetableInvalid as AirflowTimetableInvalid,
TaskNotFound as TaskNotFound,
)
except ModuleNotFoundError:
# When _AIRFLOW__AS_LIBRARY is set, airflow.sdk may not be installed.
# In that case, we define fallback exception classes that mirror the SDK ones.
class AirflowException(Exception): # type: ignore[no-redef]
"""Base exception for Airflow errors."""

pass

class AirflowNotFoundException(AirflowException): # type: ignore[no-redef]
"""Raise when a requested object is not found."""

pass
class AirflowTimetableInvalid(AirflowException): # type: ignore[no-redef]
"""Raise when a DAG has an invalid timetable."""

class TaskNotFound(AirflowException): # type: ignore[no-redef]
"""Raise when a Task is not available in the system."""

pass

class AirflowRescheduleException(AirflowException): # type: ignore[no-redef]
"""
Raise when the task should be re-scheduled at a later time.
Expand Down Expand Up @@ -120,10 +118,6 @@ class AirflowClusterPolicyError(AirflowException):
"""Raise for a Cluster Policy other than AirflowClusterPolicyViolation or AirflowClusterPolicySkipDag."""


class AirflowTimetableInvalid(AirflowException):
"""Raise when a DAG has an invalid timetable."""


class DagNotFound(AirflowNotFoundException):
"""Raise when a DAG is not available in the system."""

Expand Down Expand Up @@ -308,23 +302,23 @@ class AirflowClearRunningTaskException(AirflowException):


_DEPRECATED_EXCEPTIONS = {
"AirflowTaskTerminated": "airflow.sdk.exceptions.AirflowTaskTerminated",
"DuplicateTaskIdFound": "airflow.sdk.exceptions.DuplicateTaskIdFound",
"FailFastDagInvalidTriggerRule": "airflow.sdk.exceptions.FailFastDagInvalidTriggerRule",
"TaskAlreadyInTaskGroup": "airflow.sdk.exceptions.TaskAlreadyInTaskGroup",
"TaskDeferralTimeout": "airflow.sdk.exceptions.TaskDeferralTimeout",
"XComNotFound": "airflow.sdk.exceptions.XComNotFound",
"DownstreamTasksSkipped": "airflow.sdk.exceptions.DownstreamTasksSkipped",
"AirflowSensorTimeout": "airflow.sdk.exceptions.AirflowSensorTimeout",
"DagRunTriggerException": "airflow.sdk.exceptions.DagRunTriggerException",
"TaskDeferralError": "airflow.sdk.exceptions.TaskDeferralError",
"AirflowDagCycleException": "airflow.sdk.exceptions.AirflowDagCycleException",
"AirflowInactiveAssetInInletOrOutletException": "airflow.sdk.exceptions.AirflowInactiveAssetInInletOrOutletException",
"AirflowSkipException": "airflow.sdk.exceptions.AirflowSkipException",
"AirflowTaskTimeout": "airflow.sdk.exceptions.AirflowTaskTimeout",
"AirflowFailException": "airflow.sdk.exceptions.AirflowFailException",
"ParamValidationError": "airflow.sdk.exceptions.ParamValidationError",
"TaskDeferred": "airflow.sdk.exceptions.TaskDeferred",
"AirflowDagCycleException",
"AirflowFailException",
"AirflowInactiveAssetInInletOrOutletException",
"AirflowSensorTimeout",
"AirflowSkipException",
"AirflowTaskTerminated",
"AirflowTaskTimeout",
"DagRunTriggerException",
"DownstreamTasksSkipped",
"DuplicateTaskIdFound",
"FailFastDagInvalidTriggerRule",
"ParamValidationError",
"TaskAlreadyInTaskGroup",
"TaskDeferralError",
"TaskDeferralTimeout",
"TaskDeferred",
"XComNotFound",
}


Expand All @@ -336,7 +330,7 @@ def __getattr__(name: str):
from airflow import DeprecatedImportWarning
from airflow.utils.module_loading import import_string

target_path = _DEPRECATED_EXCEPTIONS[name]
target_path = f"airflow.sdk.exceptions.{name}"
warnings.warn(
f"airflow.exceptions.{name} is deprecated and will be removed in a future version. Use {target_path} instead.",
DeprecatedImportWarning,
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ async def create_triggers(self):
await asyncio.sleep(0)

try:
from airflow.serialization.serialized_objects import smart_decode_trigger_kwargs
from airflow.serialization.decoders import smart_decode_trigger_kwargs

# Decrypt and clean trigger kwargs before for execution
# Note: We only clean up serialization artifacts (__var, __type keys) here,
Expand Down
132 changes: 132 additions & 0 deletions airflow-core/src/airflow/serialization/decoders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import datetime
from typing import TYPE_CHECKING, Any, TypeVar

import dateutil.relativedelta

from airflow.sdk import ( # TODO: Implement serialized assets.
Asset,
AssetAlias,
AssetAll,
AssetAny,
)
from airflow.serialization.definitions.assets import SerializedAssetWatcher
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.serialization.helpers import find_registered_custom_timetable, is_core_timetable_import_path
from airflow.utils.module_loading import import_string

if TYPE_CHECKING:
from airflow.sdk.definitions.asset import BaseAsset
from airflow.timetables.base import Timetable as CoreTimetable

R = TypeVar("R")


def decode_relativedelta(var: dict[str, Any]) -> dateutil.relativedelta.relativedelta:
"""Dencode a relativedelta object."""
if "weekday" in var:
var["weekday"] = dateutil.relativedelta.weekday(*var["weekday"])
return dateutil.relativedelta.relativedelta(**var)


def decode_interval(value: int | dict) -> datetime.timedelta | dateutil.relativedelta.relativedelta:
if isinstance(value, dict):
return decode_relativedelta(value)
return datetime.timedelta(seconds=value)


def decode_run_immediately(value: bool | float) -> bool | datetime.timedelta:
if isinstance(value, float):
return datetime.timedelta(seconds=value)
return value


def smart_decode_trigger_kwargs(d):
"""
Slightly clean up kwargs for display or execution.

This detects one level of BaseSerialization and tries to deserialize the
content, removing some __type __var ugliness when the value is displayed
in UI to the user and/or while execution.
"""
from airflow.serialization.serialized_objects import BaseSerialization

if not isinstance(d, dict) or Encoding.TYPE not in d:
return d
return BaseSerialization.deserialize(d)


def decode_asset(var: dict[str, Any]):
watchers = var.get("watchers", [])
return Asset(
name=var["name"],
uri=var["uri"],
group=var["group"],
extra=var["extra"],
watchers=[
SerializedAssetWatcher(
name=watcher["name"],
trigger={
"classpath": watcher["trigger"]["classpath"],
"kwargs": smart_decode_trigger_kwargs(watcher["trigger"]["kwargs"]),
},
)
for watcher in watchers
],
)


def decode_asset_condition(var: dict[str, Any]) -> BaseAsset:
"""
Decode a previously serialized asset condition.

:meta private:
"""
match var["__type"]:
case DAT.ASSET:
return decode_asset(var)
case DAT.ASSET_ALL:
return AssetAll(*(decode_asset_condition(x) for x in var["objects"]))
case DAT.ASSET_ANY:
return AssetAny(*(decode_asset_condition(x) for x in var["objects"]))
case DAT.ASSET_ALIAS:
return AssetAlias(name=var["name"], group=var["group"])
case DAT.ASSET_REF:
return Asset.ref(**{k: v for k, v in var.items() if k != "__type"})
case data_type:
raise ValueError(f"deserialization not implemented for DAT {data_type!r}")


def decode_timetable(var: dict[str, Any]) -> CoreTimetable:
"""
Decode a previously serialized timetable.

Most of the deserialization logic is delegated to the actual type, which
we import from string.

:meta private:
"""
if is_core_timetable_import_path(importable_string := var[Encoding.TYPE]):
timetable_type: type[CoreTimetable] = import_string(importable_string)
else:
timetable_type = find_registered_custom_timetable(importable_string)
return timetable_type.deserialize(var[Encoding.VAR])
27 changes: 27 additions & 0 deletions airflow-core/src/airflow/serialization/definitions/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from airflow.sdk import AssetWatcher # TODO: Implement serialized assets.


class SerializedAssetWatcher(AssetWatcher):
"""JSON serializable representation of an asset watcher."""

trigger: dict
Loading
Loading