Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
/airflow/ui/ @bbovenzi @pierrejeambrun @ryanahamilton @jscheffl

# Security/Permissions
/airflow/api_connexion/security.py @vincbeck
/airflow/security/permissions.py @vincbeck
/airflow/www/security.py @vincbeck

# Calendar/Timetables
/airflow/timetables/ @uranusjr
Expand Down
1 change: 0 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ at 3rd-party-licenses/LICENSE-[project].txt.
(ALv2 License) hue v4.3.0 (https://github.com/cloudera/hue/)
(ALv2 License) jqclock v2.3.0 (https://github.com/JohnRDOrazio/jQuery-Clock-Plugin)
(ALv2 License) bootstrap3-typeahead v4.0.2 (https://github.com/bassjobsen/Bootstrap-3-Typeahead)
(ALv2 License) connexion v2.7.0 (https://github.com/zalando/connexion)

========================================================================
MIT licenses
Expand Down
16 changes: 0 additions & 16 deletions airflow/api_connexion/__init__.py

This file was deleted.

16 changes: 0 additions & 16 deletions airflow/api_connexion/schemas/__init__.py

This file was deleted.

86 changes: 0 additions & 86 deletions airflow/api_connexion/schemas/dag_schema.py

This file was deleted.

78 changes: 71 additions & 7 deletions airflow/cli/commands/remote_commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@
import sys
from typing import TYPE_CHECKING

from itsdangerous import URLSafeSerializer
from marshmallow import fields
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
from sqlalchemy import func, select

from airflow.api.client import get_current_api_client
from airflow.api_connexion.schemas.dag_schema import dag_schema
from airflow.cli.simple_table import AirflowConsole
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
from airflow.configuration import conf
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.exceptions import AirflowException
from airflow.jobs.job import Job
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.models import DagBag, DagModel, DagRun, DagTag, TaskInstance
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.sdk.definitions._internal.dag_parsing_context import _airflow_parsing_context_manager
Expand All @@ -59,6 +62,66 @@
log = logging.getLogger(__name__)


# TODO: To clean up api_connexion, we need to move the below 2 classes to this file until migrated to FastAPI
class DagTagSchema(SQLAlchemySchema):
"""Dag Tag schema."""

class Meta:
"""Meta."""

model = DagTag

name = auto_field()


class DAGSchema(SQLAlchemySchema):
"""DAG schema."""

class Meta:
"""Meta."""

model = DagModel

dag_id = auto_field(dump_only=True)
dag_display_name = fields.String(attribute="dag_display_name", dump_only=True)
bundle_name = auto_field(dump_only=True)
bundle_version = auto_field(dump_only=True)
is_paused = auto_field()
is_active = auto_field(dump_only=True)
last_parsed_time = auto_field(dump_only=True)
last_expired = auto_field(dump_only=True)
default_view = auto_field(dump_only=True)
fileloc = auto_field(dump_only=True)
file_token = fields.Method("get_token", dump_only=True)
owners = fields.Method("get_owners", dump_only=True)
description = auto_field(dump_only=True)
timetable_summary = auto_field(dump_only=True)
timetable_description = auto_field(dump_only=True)
tags = fields.List(fields.Nested(DagTagSchema), dump_only=True)
max_active_tasks = auto_field(dump_only=True)
max_active_runs = auto_field(dump_only=True)
max_consecutive_failed_dag_runs = auto_field(dump_only=True)
has_task_concurrency_limits = auto_field(dump_only=True)
has_import_errors = auto_field(dump_only=True)
next_dagrun = auto_field(dump_only=True)
next_dagrun_data_interval_start = auto_field(dump_only=True)
next_dagrun_data_interval_end = auto_field(dump_only=True)
next_dagrun_create_after = auto_field(dump_only=True)

@staticmethod
def get_owners(obj: DagModel):
"""Convert owners attribute to DAG representation."""
if not getattr(obj, "owners", None):
return []
return obj.owners.split(",")

@staticmethod
def get_token(obj: DagModel):
"""Return file token."""
serializer = URLSafeSerializer(conf.get_mandatory_value("webserver", "secret_key"))
return serializer.dumps(obj.fileloc)


@cli_utils.action_cli
@providers_configuration_loaded
def dag_trigger(args) -> None:
Expand Down Expand Up @@ -329,15 +392,16 @@ def print_execution_interval(interval: DataInterval | None):
def dag_list_dags(args, session: Session = NEW_SESSION) -> None:
"""Display dags with or without stats at the command line."""
cols = args.columns if args.columns else []
invalid_cols = [c for c in cols if c not in dag_schema.fields]
valid_cols = [c for c in cols if c in dag_schema.fields]
dag_schema_fields = DAGSchema().fields
invalid_cols = [c for c in cols if c not in dag_schema_fields]
valid_cols = [c for c in cols if c in dag_schema_fields]

if invalid_cols:
from rich import print as rich_print

rich_print(
f"[red][bold]Error:[/bold] Ignoring the following invalid columns: {invalid_cols}. "
f"List of valid columns: {list(dag_schema.fields.keys())}",
f"List of valid columns: {list(dag_schema_fields.keys())}",
file=sys.stderr,
)

Expand All @@ -363,7 +427,7 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> None:
def get_dag_detail(dag: DAG) -> dict:
dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
if dag_model:
dag_detail = dag_schema.dump(dag_model)
dag_detail = DAGSchema().dump(dag_model)
else:
dag_detail = _get_dagbag_dag_details(dag)
return {col: dag_detail[col] for col in valid_cols}
Expand Down Expand Up @@ -395,7 +459,7 @@ def dag_details(args, session: Session = NEW_SESSION):
dag = DagModel.get_dagmodel(args.dag_id, session=session)
if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
dag_detail = dag_schema.dump(dag)
dag_detail = DAGSchema().dump(dag)

if args.output in ["table", "plain"]:
data = [{"property_name": key, "property_value": value} for key, value in dag_detail.items()]
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ logging:
consoles\.
version_added: 2.0.0
type: string
example: "connexion,sqlalchemy"
example: "fastapi,sqlalchemy"
default: ""
worker_log_server_port:
description: |
Expand Down
2 changes: 1 addition & 1 deletion contributing-docs/testing/unit_tests.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ test types you want to use in various ``breeze testing`` sub-commands in three w
Those test types are defined:

* ``Always`` - those are tests that should be always executed (always sub-folder)
* ``API`` - Tests for the Airflow API (api, api_connexion, api_internal, api_fastapi sub-folders)
* ``API`` - Tests for the Airflow API (api, api_internal, api_fastapi sub-folders)
* ``CLI`` - Tests for the Airflow CLI (cli folder)
* ``Core`` - for the core Airflow functionality (core, executors, jobs, models, ti_deps, utils sub-folders)
* ``Operators`` - tests for the operators (operators folder)
Expand Down
6 changes: 3 additions & 3 deletions dev/README_RELEASE_AIRFLOW.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ pipx install -e ./dev/breeze
git reset --hard origin/v${VERSION_BRANCH}-test
```

- Set your version in `airflow/__init__.py`, `airflow/api_connexion/openapi/v1.yaml` (without the RC tag).
- Set your version in `airflow/__init__.py` (without the RC tag).
- Run `git commit` without a message to update versions in `docs`.
- Add supported Airflow version to `./scripts/ci/pre_commit/supported_versions.py` and let pre-commit do the job again.
- Replace the versions in `README.md` about installation and verify that installation instructions work fine.
Expand Down Expand Up @@ -1039,7 +1039,7 @@ EOF
This includes:

- Modify `./scripts/ci/pre_commit/supported_versions.py` and let pre-commit do the job.
- For major/minor release, update version in `airflow/__init__.py`, `docs/docker-stack/` and `airflow/api_connexion/openapi/v1.yaml` to the next likely minor version release.
- For major/minor release, update version in `airflow/__init__.py` and `docs/docker-stack/` to the next likely minor version release.
- Sync `RELEASE_NOTES.rst` (including deleting relevant `newsfragments`) and `README.md` changes.
- Updating `Dockerfile` with the new version.
- Updating `airflow_bug_report.yml` issue template in `.github/ISSUE_TEMPLATE/` with the new version.
Expand Down Expand Up @@ -1081,7 +1081,7 @@ Clients can be found here:
### API Clients versioning policy

Clients and Core versioning are completely decoupled. Clients also follow SemVer and are updated when core introduce changes relevant to the clients.
Most of the time, if the [openapi specification](https://github.com/apache/airflow/blob/main/airflow/api_connexion/openapi/v1.yaml) has
Most of the time, if the [openapi specification](https://github.com/apache/airflow/blob/main/clients/python/openapi_v1.yaml) has
changed, clients need to be released.

To determine if you should release API clients, you can run from the airflow repository:
Expand Down
2 changes: 1 addition & 1 deletion dev/README_RELEASE_PYTHON_CLIENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ echo "${VERSION}" > clients/python/version.txt

```shell script
cd ${AIRFLOW_REPO_ROOT}
git log 2.8.0..HEAD --pretty=oneline -- airflow/api_connexion/openapi/v1.yaml
git log 2.8.0..HEAD --pretty=oneline -- clients/python/openapi_v1.yaml
```

- Update CHANGELOG.md with the details.
Expand Down
2 changes: 1 addition & 1 deletion dev/airflow-github
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ def api_clients_policy(previous_version, target_version):
repo,
previous_version,
target_version,
files=[f"{repo.working_dir}/airflow/api_connexion/openapi/v1.yaml"],
files=[f"{repo.working_dir}/clients/python/openapi_v1.yaml"],
)

clients_need_release = False
Expand Down
32 changes: 0 additions & 32 deletions dev/breeze/src/airflow_breeze/utils/selective_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@
FULL_TESTS_NEEDED_LABEL = "full tests needed"
INCLUDE_SUCCESS_OUTPUTS_LABEL = "include success outputs"
LATEST_VERSIONS_ONLY_LABEL = "latest versions only"
LEGACY_UI_LABEL = "legacy ui"
LEGACY_API_LABEL = "legacy api"
LOG_WITHOUT_MOCK_IN_TESTS_EXCEPTION_LABEL = "log exception"
NON_COMMITTER_BUILD_LABEL = "non committer build"
UPGRADE_TO_NEWER_DEPENDENCIES_LABEL = "upgrade to newer dependencies"
Expand All @@ -102,7 +100,6 @@ class FileGroupForCi(Enum):
ALWAYS_TESTS_FILES = "always_test_files"
API_FILES = "api_files"
API_CODEGEN_FILES = "api_codegen_files"
LEGACY_API_FILES = "legacy_api_files"
HELM_FILES = "helm_files"
DEPENDENCY_FILES = "dependency_files"
DOC_FILES = "doc_files"
Expand Down Expand Up @@ -171,9 +168,6 @@ def __hash__(self):
r"^airflow/api_fastapi/core_api/openapi/v1-generated\.yaml",
r"^clients/gen",
],
FileGroupForCi.LEGACY_API_FILES: [
r"^airflow/api_connexion/",
],
FileGroupForCi.HELM_FILES: [
r"^chart",
r"^airflow/kubernetes",
Expand Down Expand Up @@ -288,10 +282,8 @@ def __hash__(self):
{
SelectiveCoreTestType.API: [
r"^airflow/api/",
r"^airflow/api_connexion/",
r"^airflow/api_fastapi/",
r"^tests/api/",
r"^tests/api_connexion/",
r"^tests/api_fastapi/",
],
SelectiveCoreTestType.CLI: [
Expand Down Expand Up @@ -1510,30 +1502,6 @@ def _is_canary_run(self):
and self._github_repository == APACHE_AIRFLOW_GITHUB_REPOSITORY
) or CANARY_LABEL in self._pr_labels

@cached_property
def is_legacy_ui_api_labeled(self) -> bool:
# Selective check for legacy UI/API updates.
# It is to ping the maintainer to add the label and make them aware of the changes.
if self._is_canary_run() or self._github_event not in (
GithubEvents.PULL_REQUEST,
GithubEvents.PULL_REQUEST_TARGET,
):
return False

if (
self._matching_files(
FileGroupForCi.LEGACY_API_FILES, CI_FILE_GROUP_MATCHES, CI_FILE_GROUP_EXCLUDES
)
and LEGACY_API_LABEL not in self._pr_labels
):
get_console().print(
f"[error]Please ask maintainer to assign "
f"the '{LEGACY_API_LABEL}' label to the PR in order to continue"
)
sys.exit(1)
else:
return True

@classmethod
def _find_caplog_in_def(cls, added_lines):
"""
Expand Down
Loading