Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ dependencies = [
"libcst >=1.1.0",
"linkify-it-py>=2.0.0",
"lockfile>=0.12.2",
"marshmallow-oneofschema>=2.0.1",
"methodtools>=0.4.7",
"opentelemetry-api>=1.24.0",
"opentelemetry-exporter-otlp>=1.24.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class DAGResponse(BaseModel):
last_parsed_time: datetime | None
last_expired: datetime | None
bundle_name: str | None
bundle_version: str | None
relative_fileloc: str | None
fileloc: str
description: str | None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,11 @@ components:
- type: string
- type: 'null'
title: Bundle Name
bundle_version:
anyOf:
- type: string
- type: 'null'
title: Bundle Version
relative_fileloc:
anyOf:
- type: string
Expand Down Expand Up @@ -1140,6 +1145,7 @@ components:
- last_parsed_time
- last_expired
- bundle_name
- bundle_version
- relative_fileloc
- fileloc
- description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7736,6 +7736,11 @@ components:
- type: string
- type: 'null'
title: Bundle Name
bundle_version:
anyOf:
- type: string
- type: 'null'
title: Bundle Version
relative_fileloc:
anyOf:
- type: string
Expand Down Expand Up @@ -7899,6 +7904,7 @@ components:
- last_parsed_time
- last_expired
- bundle_name
- bundle_version
- relative_fileloc
- fileloc
- description
Expand Down Expand Up @@ -7974,6 +7980,11 @@ components:
- type: string
- type: 'null'
title: Bundle Name
bundle_version:
anyOf:
- type: string
- type: 'null'
title: Bundle Version
relative_fileloc:
anyOf:
- type: string
Expand Down Expand Up @@ -8062,6 +8073,7 @@ components:
- last_parsed_time
- last_expired
- bundle_name
- bundle_version
- relative_fileloc
- fileloc
- description
Expand Down
87 changes: 14 additions & 73 deletions airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,16 @@
from pathlib import Path
from typing import TYPE_CHECKING

from itsdangerous import URLSafeSerializer
from marshmallow import fields
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
from sqlalchemy import func, select

from airflow.api.client import get_current_api_client
from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse
from airflow.cli.simple_table import AirflowConsole
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
from airflow.configuration import conf
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.exceptions import AirflowException
from airflow.jobs.job import Job
from airflow.models import DagBag, DagModel, DagRun, DagTag, TaskInstance
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.models.dag import DAG
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
Expand All @@ -61,66 +58,10 @@
from sqlalchemy.orm import Session

from airflow.timetables.base import DataInterval
log = logging.getLogger(__name__)

DAG_DETAIL_FIELDS = {*DAGResponse.model_fields, *DAGResponse.model_computed_fields}

# TODO: To clean up api_connexion, we need to move the below 2 classes to this file until migrated to FastAPI
class DagTagSchema(SQLAlchemySchema):
"""Dag Tag schema."""

class Meta:
"""Meta."""

model = DagTag

name = auto_field()


class DAGSchema(SQLAlchemySchema):
"""DAG schema."""

class Meta:
"""Meta."""

model = DagModel

dag_id = auto_field(dump_only=True)
dag_display_name = fields.String(attribute="dag_display_name", dump_only=True)
bundle_name = auto_field(dump_only=True)
bundle_version = auto_field(dump_only=True)
is_paused = auto_field()
is_stale = auto_field(dump_only=True)
last_parsed_time = auto_field(dump_only=True)
last_expired = auto_field(dump_only=True)
fileloc = auto_field(dump_only=True)
file_token = fields.Method("get_token", dump_only=True)
owners = fields.Method("get_owners", dump_only=True)
description = auto_field(dump_only=True)
timetable_summary = auto_field(dump_only=True)
timetable_description = auto_field(dump_only=True)
tags = fields.List(fields.Nested(DagTagSchema), dump_only=True)
max_active_tasks = auto_field(dump_only=True)
max_active_runs = auto_field(dump_only=True)
max_consecutive_failed_dag_runs = auto_field(dump_only=True)
has_task_concurrency_limits = auto_field(dump_only=True)
has_import_errors = auto_field(dump_only=True)
next_dagrun = auto_field(dump_only=True)
next_dagrun_data_interval_start = auto_field(dump_only=True)
next_dagrun_data_interval_end = auto_field(dump_only=True)
next_dagrun_create_after = auto_field(dump_only=True)

@staticmethod
def get_owners(obj: DagModel):
"""Convert owners attribute to DAG representation."""
if not getattr(obj, "owners", None):
return []
return obj.owners.split(",")

@staticmethod
def get_token(obj: DagModel):
"""Return file token."""
serializer = URLSafeSerializer(conf.get_mandatory_value("webserver", "secret_key"))
return serializer.dumps(obj.fileloc)
log = logging.getLogger(__name__)


@cli_utils.action_cli
Expand Down Expand Up @@ -295,6 +236,7 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
"is_stale": dag.get_is_stale(),
"last_parsed_time": None,
"last_expired": None,
"relative_fileloc": dag.relative_fileloc,
"fileloc": dag.fileloc,
"file_token": None,
"owners": dag.owner,
Expand All @@ -309,10 +251,10 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None for t in dag.tasks
),
"has_import_errors": False,
"next_dagrun": None,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"next_dagrun_create_after": None,
"next_dagrun_logical_date": None,
"next_dagrun_run_after": None,
}


Expand Down Expand Up @@ -391,16 +333,13 @@ def print_execution_interval(interval: DataInterval | None):
def dag_list_dags(args, session: Session = NEW_SESSION) -> None:
"""Display dags with or without stats at the command line."""
cols = args.columns if args.columns else []
dag_schema_fields = DAGSchema().fields
invalid_cols = [c for c in cols if c not in dag_schema_fields]
valid_cols = [c for c in cols if c in dag_schema_fields]

if invalid_cols:
if invalid_cols := [c for c in cols if c not in DAG_DETAIL_FIELDS]:
from rich import print as rich_print

rich_print(
f"[red][bold]Error:[/bold] Ignoring the following invalid columns: {invalid_cols}. "
f"List of valid columns: {list(dag_schema_fields.keys())}",
f"List of valid columns: {sorted(DAG_DETAIL_FIELDS)}",
file=sys.stderr,
)

Expand All @@ -426,10 +365,12 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> None:
def get_dag_detail(dag: DAG) -> dict:
dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
if dag_model:
dag_detail = DAGSchema().dump(dag_model)
dag_detail = DAGResponse.from_orm(dag_model).model_dump()
else:
dag_detail = _get_dagbag_dag_details(dag)
return {col: dag_detail[col] for col in valid_cols}
if not cols:
return dag_detail
return {col: dag_detail[col] for col in cols if col in DAG_DETAIL_FIELDS}

def filter_dags_by_bundle(dags: list[DAG], bundle_names: list[str] | None) -> list[DAG]:
"""Filter DAGs based on the specified bundle name, if provided."""
Expand Down Expand Up @@ -458,7 +399,7 @@ def dag_details(args, session: Session = NEW_SESSION):
dag = DagModel.get_dagmodel(args.dag_id, session=session)
if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
dag_detail = DAGSchema().dump(dag)
dag_detail = DAGResponse.from_orm(dag).model_dump()

if args.output in ["table", "plain"]:
data = [{"property_name": key, "property_value": value} for key, value in dag_detail.items()]
Expand Down
36 changes: 36 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,17 @@ export const $DAGDetailsResponse = {
],
title: "Bundle Name",
},
bundle_version: {
anyOf: [
{
type: "string",
},
{
type: "null",
},
],
title: "Bundle Version",
},
relative_fileloc: {
anyOf: [
{
Expand Down Expand Up @@ -1737,6 +1748,7 @@ export const $DAGDetailsResponse = {
"last_parsed_time",
"last_expired",
"bundle_name",
"bundle_version",
"relative_fileloc",
"fileloc",
"description",
Expand Down Expand Up @@ -1840,6 +1852,17 @@ export const $DAGResponse = {
],
title: "Bundle Name",
},
bundle_version: {
anyOf: [
{
type: "string",
},
{
type: "null",
},
],
title: "Bundle Version",
},
relative_fileloc: {
anyOf: [
{
Expand Down Expand Up @@ -1993,6 +2016,7 @@ export const $DAGResponse = {
"last_parsed_time",
"last_expired",
"bundle_name",
"bundle_version",
"relative_fileloc",
"fileloc",
"description",
Expand Down Expand Up @@ -6070,6 +6094,17 @@ export const $DAGWithLatestDagRunsResponse = {
],
title: "Bundle Name",
},
bundle_version: {
anyOf: [
{
type: "string",
},
{
type: "null",
},
],
title: "Bundle Version",
},
relative_fileloc: {
anyOf: [
{
Expand Down Expand Up @@ -6242,6 +6277,7 @@ export const $DAGWithLatestDagRunsResponse = {
"last_parsed_time",
"last_expired",
"bundle_name",
"bundle_version",
"relative_fileloc",
"fileloc",
"description",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ export type DAGDetailsResponse = {
last_parsed_time: string | null;
last_expired: string | null;
bundle_name: string | null;
bundle_version: string | null;
relative_fileloc: string | null;
fileloc: string;
description: string | null;
Expand Down Expand Up @@ -502,6 +503,7 @@ export type DAGResponse = {
last_parsed_time: string | null;
last_expired: string | null;
bundle_name: string | null;
bundle_version: string | null;
relative_fileloc: string | null;
fileloc: string;
description: string | null;
Expand Down Expand Up @@ -1534,6 +1536,7 @@ export type DAGWithLatestDagRunsResponse = {
last_parsed_time: string | null;
last_expired: string | null;
bundle_name: string | null;
bundle_version: string | null;
relative_fileloc: string | null;
fileloc: string;
description: string | null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { DagCard } from "./DagCard";
const mockDag = {
asset_expression: null,
bundle_name: "dags-folder",
bundle_version: "1",
dag_display_name: "nested_groups",
dag_id: "nested_groups",
description: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ def test_dag_details(
file_token = res_json["file_token"]
expected = {
"bundle_name": "dag_maker",
"bundle_version": None,
"asset_expression": None,
"catchup": False,
"concurrency": 16,
Expand Down Expand Up @@ -516,6 +517,7 @@ def test_get_dag(self, test_client, query_params, dag_id, expected_status_code,
"timetable_description": "Never, external triggers only",
"has_import_errors": False,
"bundle_name": "dag_maker",
"bundle_version": None,
"relative_fileloc": "test_dags.py",
}
assert res_json == expected
Expand Down
7 changes: 2 additions & 5 deletions airflow-core/tests/unit/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,8 @@ def test_cli_get_dag_details(self):
dag_command.dag_details(args)
out = temp_stdout.getvalue()

dag_detail_fields = dag_command.DAGSchema().fields.keys()

# Check if DAG Details field are present
for field in dag_detail_fields:
for field in dag_command.DAG_DETAIL_FIELDS:
assert field in out

# Check if identifying values are present
Expand Down Expand Up @@ -309,10 +307,9 @@ def test_list_dags_none_get_dagmodel(self, mock_get_dagmodel):

@conf_vars({("core", "load_examples"): "true"})
def test_dagbag_dag_col(self):
valid_cols = [c for c in dag_command.DAGSchema().fields]
dagbag = DagBag(include_examples=True, read_dags_from_db=True)
dag_details = dag_command._get_dagbag_dag_details(dagbag.get_dag("tutorial_dag"))
assert list(dag_details.keys()) == valid_cols
assert sorted(dag_details) == sorted(dag_command.DAG_DETAIL_FIELDS)

@conf_vars({("core", "load_examples"): "false"})
def test_cli_list_import_errors(self, get_test_dag, configure_testing_dag_bundle, caplog):
Expand Down
1 change: 1 addition & 0 deletions providers/amazon/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dependencies = [
# We can remove it after https://github.com/xmlsec/python-xmlsec/issues/344 is fixed
"xmlsec!=1.3.15,>=1.3.14",
"sagemaker-studio>=1.0.9",
"marshmallow>=3",
]

# The optional dependencies should be modified in place in the generated file
Expand Down