diff --git a/airflow/api_connexion/endpoints/dag_parsing.py b/airflow/api_connexion/endpoints/dag_parsing.py new file mode 100644 index 0000000000000..8c48888629b2b --- /dev/null +++ b/airflow/api_connexion/endpoints/dag_parsing.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from http import HTTPStatus +from typing import TYPE_CHECKING, Sequence + +from flask import Response, current_app +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy import exc, select + +from airflow.api_connexion import security +from airflow.api_connexion.exceptions import NotFound, PermissionDenied +from airflow.auth.managers.models.resource_details import DagDetails +from airflow.models.dag import DagModel +from airflow.models.dagbag import DagPriorityParsingRequest +from airflow.utils.session import NEW_SESSION, provide_session +from airflow.www.extensions.init_auth_manager import get_auth_manager + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest + + +@security.requires_access_dag("PUT") +@provide_session +def reparse_dag_file(*, file_token: str, session: Session = NEW_SESSION) -> Response: + """Request re-parsing a DAG file.""" + secret_key = current_app.config["SECRET_KEY"] + auth_s = URLSafeSerializer(secret_key) + try: + path = auth_s.loads(file_token) + except BadSignature: + raise NotFound("File not found") + + requests: Sequence[IsAuthorizedDagRequest] = [ + {"method": "PUT", "details": DagDetails(id=dag_id)} + for dag_id in session.scalars(select(DagModel.dag_id).where(DagModel.fileloc == path)) + ] + if not requests: + raise NotFound("File not found") + + # Check if user has read access to all the DAGs defined in the file + if not get_auth_manager().batch_is_authorized_dag(requests): + raise PermissionDenied() + + parsing_request = DagPriorityParsingRequest(fileloc=path) + session.add(parsing_request) + try: + session.commit() + except exc.IntegrityError: + session.rollback() + return Response("Duplicate request", HTTPStatus.CREATED) + return Response(status=HTTPStatus.CREATED) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index b5e3ef72e1c61..eddfb8ca2f5e1 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1091,6 +1091,27 @@ paths: "404": $ref: "#/components/responses/NotFound" + /parseDagFile/{file_token}: + parameters: + - $ref: "#/components/parameters/FileToken" + + put: + summary: Request re-parsing of a DAG file + description: > + Request re-parsing of existing DAG files using a file token. + x-openapi-router-controller: airflow.api_connexion.endpoints.dag_parsing + operationId: reparse_dag_file + tags: [ DAG ] + responses: + "201": + description: Success. + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + /datasets/queuedEvent/{uri}: parameters: - $ref: "#/components/parameters/DatasetURI" @@ -3159,6 +3180,7 @@ components: *New in version 2.5.0* nullable: true + UpdateDagRunState: type: object description: | diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index b2931feb5d756..074d83585bd9a 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -46,6 +46,7 @@ from airflow.configuration import conf from airflow.dag_processing.processor import DagFileProcessorProcess from airflow.models.dag import DagModel +from airflow.models.dagbag import DagPriorityParsingRequest from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest from airflow.models.errors import ParseImportError @@ -616,6 +617,7 @@ def _run_parsing_loop(self): elif refreshed_dag_dir: self.add_new_file_path_to_queue() + self._refresh_requested_filelocs() self.start_new_processes() # Update number of loop iteration. @@ -728,6 +730,24 @@ def _add_callback_to_queue(self, request: CallbackRequest): self._add_paths_to_queue([request.full_filepath], True) Stats.incr("dag_processing.other_callback_count") + @provide_session + def _refresh_requested_filelocs(self, session=NEW_SESSION) -> None: + """Refresh filepaths from dag dir as requested by users via APIs.""" + # Get values from DB table + requests = session.scalars(select(DagPriorityParsingRequest)) + for request in requests: + # Check if fileloc is in valid file paths. Parsing any + # filepaths can be a security issue. + if request.fileloc in self._file_paths: + # Try removing the fileloc if already present + try: + self._file_path_queue.remove(request.fileloc) + except ValueError: + pass + # enqueue fileloc to the start of the queue. + self._file_path_queue.appendleft(request.fileloc) + session.delete(request) + def _refresh_dag_dir(self) -> bool: """Refresh file paths from dag dir if we haven't done it for too long.""" now = timezone.utcnow() diff --git a/airflow/migrations/versions/0144_2_10_0_added_dagpriorityparsingrequest_table.py b/airflow/migrations/versions/0144_2_10_0_added_dagpriorityparsingrequest_table.py new file mode 100644 index 0000000000000..9d50bd9d5bf14 --- /dev/null +++ b/airflow/migrations/versions/0144_2_10_0_added_dagpriorityparsingrequest_table.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Added DagPriorityParsingRequest table. + +Revision ID: c4602ba06b4b +Revises: 677fdbb7fc54 +Create Date: 2024-04-17 17:12:05.473889 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "c4602ba06b4b" +down_revision = "677fdbb7fc54" +branch_labels = None +depends_on = None +airflow_version = "2.10.0" + + +def upgrade(): + """Apply Added DagPriorityParsingRequest table.""" + op.create_table( + "dag_priority_parsing_request", + sa.Column("id", sa.String(length=32), nullable=False), + sa.Column("fileloc", sa.String(length=2000), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("dag_priority_parsing_request_pkey")), + ) + + +def downgrade(): + """Unapply Added DagPriorityParsingRequest table.""" + op.drop_table("dag_priority_parsing_request") diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 1afea71c00167..a433024366268 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import hashlib import importlib import importlib.machinery import importlib.util @@ -30,6 +31,10 @@ from pathlib import Path from typing import TYPE_CHECKING, NamedTuple +from sqlalchemy import ( + Column, + String, +) from sqlalchemy.exc import OperationalError from tabulate import tabulate @@ -43,6 +48,7 @@ AirflowDagDuplicatedIdException, RemovedInAirflow3Warning, ) +from airflow.models.base import Base from airflow.stats import Stats from airflow.utils import timezone from airflow.utils.dag_cycle_tester import check_cycle @@ -727,3 +733,32 @@ def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION): security_manager = ApplessAirflowSecurityManager(session=session) security_manager.sync_perm_for_dag(root_dag_id, dag.access_control) + + +def generate_md5_hash(context): + fileloc = context.get_current_parameters()["fileloc"] + return hashlib.md5(fileloc.encode()).hexdigest() + + +class DagPriorityParsingRequest(Base): + """Model to store the dag parsing requests that will be prioritized when parsing files.""" + + __tablename__ = "dag_priority_parsing_request" + + # Adding a unique constraint to fileloc results in the creation of an index and we have a limitation + # on the size of the string we can use in the index for MySQL DB. We also have to keep the fileloc + # size consistent with other tables. This is a workaround to enforce the unique constraint. + id = Column(String(32), primary_key=True, default=generate_md5_hash, onupdate=generate_md5_hash) + + # The location of the file containing the DAG object + # Note: Do not depend on fileloc pointing to a file; in the case of a + # packaged DAG, it will point to the subpath of the DAG within the + # associated zip. + fileloc = Column(String(2000), nullable=False) + + def __init__(self, fileloc: str) -> None: + super().__init__() + self.fileloc = fileloc + + def __repr__(self) -> str: + return f"" diff --git a/airflow/utils/db.py b/airflow/utils/db.py index f25594ef68838..41fa1ab2e0064 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -118,7 +118,7 @@ class MappedClassProtocol(Protocol): "2.8.1": "88344c1d9134", "2.9.0": "1949afb29106", "2.9.2": "686269002441", - "2.10.0": "677fdbb7fc54", + "2.10.0": "c4602ba06b4b", } diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index b8da89e55604b..5b8f33d7ee318 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -273,6 +273,20 @@ export interface paths { }; }; }; + "/parseDagFile/{file_token}": { + /** Request re-parsing of existing DAG files using a file token. */ + put: operations["reparse_dag_file"]; + parameters: { + path: { + /** + * The key containing the encrypted path to the file. Encryption and decryption take place only on + * the server. This prevents the client from reading an non-DAG file. This also ensures API + * extensibility, because the format of encrypted data may change. + */ + file_token: components["parameters"]["FileToken"]; + }; + }; + }; "/datasets/queuedEvent/{uri}": { /** * Get queued Dataset events for a Dataset @@ -3438,6 +3452,26 @@ export interface operations { 404: components["responses"]["NotFound"]; }; }; + /** Request re-parsing of existing DAG files using a file token. */ + reparse_dag_file: { + parameters: { + path: { + /** + * The key containing the encrypted path to the file. Encryption and decryption take place only on + * the server. This prevents the client from reading an non-DAG file. This also ensures API + * extensibility, because the format of encrypted data may change. + */ + file_token: components["parameters"]["FileToken"]; + }; + }; + responses: { + /** Success. */ + 201: unknown; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + }; + }; /** * Get queued Dataset events for a Dataset * @@ -5388,6 +5422,9 @@ export type DeleteDagDatasetQueuedEventsVariables = CamelCasedPropertiesDeep< operations["delete_dag_dataset_queued_events"]["parameters"]["path"] & operations["delete_dag_dataset_queued_events"]["parameters"]["query"] >; +export type ReparseDagFileVariables = CamelCasedPropertiesDeep< + operations["reparse_dag_file"]["parameters"]["path"] +>; export type GetDatasetQueuedEventsVariables = CamelCasedPropertiesDeep< operations["get_dataset_queued_events"]["parameters"]["path"] & operations["get_dataset_queued_events"]["parameters"]["query"] diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 7359dce5d57ee..0a2cb41215b5f 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -468c1db106e059c4a97f07b9f8be7edfa487099113e4611c74f61f17c0ea0d82 \ No newline at end of file +6ae5e112d66c30d36fbc27a608355ffd66853e34d7538223f69a71e2eba54b59 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index fb280ee0ea7fc..621326f20e977 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + job @@ -69,1592 +69,1606 @@ slots [INTEGER] - + +dag_priority_parsing_request + +dag_priority_parsing_request + +id + [VARCHAR(32)] + NOT NULL + +fileloc + [VARCHAR(2000)] + NOT NULL + + + log - -log - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - -dttm - [TIMESTAMP] - -event - [VARCHAR(60)] - -execution_date - [TIMESTAMP] - -extra - [TEXT] - -map_index - [INTEGER] - -owner - [VARCHAR(500)] - -owner_display_name - [VARCHAR(500)] - -run_id - [VARCHAR(250)] - -task_id - [VARCHAR(250)] + +log + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + +dttm + [TIMESTAMP] + +event + [VARCHAR(60)] + +execution_date + [TIMESTAMP] + +extra + [TEXT] + +map_index + [INTEGER] + +owner + [VARCHAR(500)] + +owner_display_name + [VARCHAR(500)] + +run_id + [VARCHAR(250)] + +task_id + [VARCHAR(250)] - + dag_code - -dag_code - -fileloc_hash - [BIGINT] - NOT NULL - -fileloc - [VARCHAR(2000)] - NOT NULL - -last_updated - [TIMESTAMP] - NOT NULL - -source_code - [TEXT] - NOT NULL + +dag_code + +fileloc_hash + [BIGINT] + NOT NULL + +fileloc + [VARCHAR(2000)] + NOT NULL + +last_updated + [TIMESTAMP] + NOT NULL + +source_code + [TEXT] + NOT NULL - + dag_pickle - -dag_pickle - -id - [INTEGER] - NOT NULL - -created_dttm - [TIMESTAMP] - -pickle - [BYTEA] - -pickle_hash - [BIGINT] + +dag_pickle + +id + [INTEGER] + NOT NULL + +created_dttm + [TIMESTAMP] + +pickle + [BYTEA] + +pickle_hash + [BIGINT] - + ab_user - -ab_user - -id - [INTEGER] - NOT NULL - -active - [BOOLEAN] - -changed_by_fk - [INTEGER] - -changed_on - [TIMESTAMP] - -created_by_fk - [INTEGER] - -created_on - [TIMESTAMP] - -email - [VARCHAR(512)] - NOT NULL - -fail_login_count - [INTEGER] - -first_name - [VARCHAR(256)] - NOT NULL - -last_login - [TIMESTAMP] - -last_name - [VARCHAR(256)] - NOT NULL - -login_count - [INTEGER] - -password - [VARCHAR(256)] - -username - [VARCHAR(512)] - NOT NULL + +ab_user + +id + [INTEGER] + NOT NULL + +active + [BOOLEAN] + +changed_by_fk + [INTEGER] + +changed_on + [TIMESTAMP] + +created_by_fk + [INTEGER] + +created_on + [TIMESTAMP] + +email + [VARCHAR(512)] + NOT NULL + +fail_login_count + [INTEGER] + +first_name + [VARCHAR(256)] + NOT NULL + +last_login + [TIMESTAMP] + +last_name + [VARCHAR(256)] + NOT NULL + +login_count + [INTEGER] + +password + [VARCHAR(256)] + +username + [VARCHAR(512)] + NOT NULL ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} - + ab_user_role - -ab_user_role - -id - [INTEGER] - NOT NULL - -role_id - [INTEGER] - -user_id - [INTEGER] + +ab_user_role + +id + [INTEGER] + NOT NULL + +role_id + [INTEGER] + +user_id + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} - + dag_run_note - -dag_run_note - -dag_run_id - [INTEGER] - NOT NULL - -content - [VARCHAR(1000)] - -created_at - [TIMESTAMP] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL - -user_id - [INTEGER] + +dag_run_note + +dag_run_id + [INTEGER] + NOT NULL + +content + [VARCHAR(1000)] + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL + +user_id + [INTEGER] ab_user--dag_run_note - -0..N -{0,1} + +0..N +{0,1} - + task_instance_note - -task_instance_note - -dag_id - [VARCHAR(250)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -content - [VARCHAR(1000)] - -created_at - [TIMESTAMP] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL - -user_id - [INTEGER] + +task_instance_note + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +content + [VARCHAR(1000)] + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL + +user_id + [INTEGER] ab_user--task_instance_note - -0..N -{0,1} + +0..N +{0,1} - + ab_register_user - -ab_register_user - -id - [INTEGER] - NOT NULL - -email - [VARCHAR(512)] - NOT NULL - -first_name - [VARCHAR(256)] - NOT NULL - -last_name - [VARCHAR(256)] - NOT NULL - -password - [VARCHAR(256)] - -registration_date - [TIMESTAMP] - -registration_hash - [VARCHAR(256)] - -username - [VARCHAR(512)] - NOT NULL + +ab_register_user + +id + [INTEGER] + NOT NULL + +email + [VARCHAR(512)] + NOT NULL + +first_name + [VARCHAR(256)] + NOT NULL + +last_name + [VARCHAR(256)] + NOT NULL + +password + [VARCHAR(256)] + +registration_date + [TIMESTAMP] + +registration_hash + [VARCHAR(256)] + +username + [VARCHAR(512)] + NOT NULL - + connection - -connection - -id - [INTEGER] - NOT NULL - -conn_id - [VARCHAR(250)] - NOT NULL - -conn_type - [VARCHAR(500)] - NOT NULL - -description - [TEXT] - -extra - [TEXT] - -host - [VARCHAR(500)] - -is_encrypted - [BOOLEAN] - -is_extra_encrypted - [BOOLEAN] - -login - [TEXT] - -password - [TEXT] - -port - [INTEGER] - -schema - [VARCHAR(500)] + +connection + +id + [INTEGER] + NOT NULL + +conn_id + [VARCHAR(250)] + NOT NULL + +conn_type + [VARCHAR(500)] + NOT NULL + +description + [TEXT] + +extra + [TEXT] + +host + [VARCHAR(500)] + +is_encrypted + [BOOLEAN] + +is_extra_encrypted + [BOOLEAN] + +login + [TEXT] + +password + [TEXT] + +port + [INTEGER] + +schema + [VARCHAR(500)] - + callback_request - -callback_request - -id - [INTEGER] - NOT NULL - -callback_data - [JSON] - NOT NULL - -callback_type - [VARCHAR(20)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -priority_weight - [INTEGER] - NOT NULL - -processor_subdir - [VARCHAR(2000)] + +callback_request + +id + [INTEGER] + NOT NULL + +callback_data + [JSON] + NOT NULL + +callback_type + [VARCHAR(20)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +priority_weight + [INTEGER] + NOT NULL + +processor_subdir + [VARCHAR(2000)] - + sla_miss - -sla_miss - -dag_id - [VARCHAR(250)] - NOT NULL - -execution_date - [TIMESTAMP] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -description - [TEXT] - -email_sent - [BOOLEAN] - -notification_sent - [BOOLEAN] - -timestamp - [TIMESTAMP] + +sla_miss + +dag_id + [VARCHAR(250)] + NOT NULL + +execution_date + [TIMESTAMP] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +description + [TEXT] + +email_sent + [BOOLEAN] + +notification_sent + [BOOLEAN] + +timestamp + [TIMESTAMP] - + variable - -variable - -id - [INTEGER] - NOT NULL - -description - [TEXT] - -is_encrypted - [BOOLEAN] - -key - [VARCHAR(250)] - -val - [TEXT] + +variable + +id + [INTEGER] + NOT NULL + +description + [TEXT] + +is_encrypted + [BOOLEAN] + +key + [VARCHAR(250)] + +val + [TEXT] - + import_error - -import_error - -id - [INTEGER] - NOT NULL - -filename - [VARCHAR(1024)] - -processor_subdir - [VARCHAR(2000)] - -stacktrace - [TEXT] - -timestamp - [TIMESTAMP] + +import_error + +id + [INTEGER] + NOT NULL + +filename + [VARCHAR(1024)] + +processor_subdir + [VARCHAR(2000)] + +stacktrace + [TEXT] + +timestamp + [TIMESTAMP] - + serialized_dag - -serialized_dag - -dag_id - [VARCHAR(250)] - NOT NULL - -dag_hash - [VARCHAR(32)] - NOT NULL - -data - [JSON] - -data_compressed - [BYTEA] - -fileloc - [VARCHAR(2000)] - NOT NULL - -fileloc_hash - [BIGINT] - NOT NULL - -last_updated - [TIMESTAMP] - NOT NULL - -processor_subdir - [VARCHAR(2000)] + +serialized_dag + +dag_id + [VARCHAR(250)] + NOT NULL + +dag_hash + [VARCHAR(32)] + NOT NULL + +data + [JSON] + +data_compressed + [BYTEA] + +fileloc + [VARCHAR(2000)] + NOT NULL + +fileloc_hash + [BIGINT] + NOT NULL + +last_updated + [TIMESTAMP] + NOT NULL + +processor_subdir + [VARCHAR(2000)] - + dataset - -dataset - -id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -extra - [JSON] - NOT NULL - -is_orphaned - [BOOLEAN] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL - -uri - [VARCHAR(3000)] - NOT NULL + +dataset + +id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +extra + [JSON] + NOT NULL + +is_orphaned + [BOOLEAN] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL + +uri + [VARCHAR(3000)] + NOT NULL - + dag_schedule_dataset_reference - -dag_schedule_dataset_reference - -dag_id - [VARCHAR(250)] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL + +dag_schedule_dataset_reference + +dag_id + [VARCHAR(250)] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL dataset--dag_schedule_dataset_reference - -1 -1 + +1 +1 - + task_outlet_dataset_reference - -task_outlet_dataset_reference - -dag_id - [VARCHAR(250)] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL + +task_outlet_dataset_reference + +dag_id + [VARCHAR(250)] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL dataset--task_outlet_dataset_reference - -1 -1 + +1 +1 - + dataset_dag_run_queue - -dataset_dag_run_queue - -dataset_id - [INTEGER] - NOT NULL - -target_dag_id - [VARCHAR(250)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL + +dataset_dag_run_queue + +dataset_id + [INTEGER] + NOT NULL + +target_dag_id + [VARCHAR(250)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL dataset--dataset_dag_run_queue - -1 -1 + +1 +1 - + dag - -dag - -dag_id - [VARCHAR(250)] - NOT NULL - -dag_display_name - [VARCHAR(2000)] - -dataset_expression - [JSON] - -default_view - [VARCHAR(25)] - -description - [TEXT] - -fileloc - [VARCHAR(2000)] - -has_import_errors - [BOOLEAN] - -has_task_concurrency_limits - [BOOLEAN] - NOT NULL - -is_active - [BOOLEAN] - -is_paused - [BOOLEAN] - -is_subdag - [BOOLEAN] - -last_expired - [TIMESTAMP] - -last_parsed_time - [TIMESTAMP] - -last_pickled - [TIMESTAMP] - -max_active_runs - [INTEGER] - -max_active_tasks - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - [INTEGER] - NOT NULL - -next_dagrun - [TIMESTAMP] - -next_dagrun_create_after - [TIMESTAMP] - -next_dagrun_data_interval_end - [TIMESTAMP] - -next_dagrun_data_interval_start - [TIMESTAMP] - -owners - [VARCHAR(2000)] - -pickle_id - [INTEGER] - -processor_subdir - [VARCHAR(2000)] - -root_dag_id - [VARCHAR(250)] - -schedule_interval - [TEXT] - -scheduler_lock - [BOOLEAN] - -timetable_description - [VARCHAR(1000)] + +dag + +dag_id + [VARCHAR(250)] + NOT NULL + +dag_display_name + [VARCHAR(2000)] + +dataset_expression + [JSON] + +default_view + [VARCHAR(25)] + +description + [TEXT] + +fileloc + [VARCHAR(2000)] + +has_import_errors + [BOOLEAN] + +has_task_concurrency_limits + [BOOLEAN] + NOT NULL + +is_active + [BOOLEAN] + +is_paused + [BOOLEAN] + +is_subdag + [BOOLEAN] + +last_expired + [TIMESTAMP] + +last_parsed_time + [TIMESTAMP] + +last_pickled + [TIMESTAMP] + +max_active_runs + [INTEGER] + +max_active_tasks + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + [INTEGER] + NOT NULL + +next_dagrun + [TIMESTAMP] + +next_dagrun_create_after + [TIMESTAMP] + +next_dagrun_data_interval_end + [TIMESTAMP] + +next_dagrun_data_interval_start + [TIMESTAMP] + +owners + [VARCHAR(2000)] + +pickle_id + [INTEGER] + +processor_subdir + [VARCHAR(2000)] + +root_dag_id + [VARCHAR(250)] + +schedule_interval + [TEXT] + +scheduler_lock + [BOOLEAN] + +timetable_description + [VARCHAR(1000)] dag--dag_schedule_dataset_reference - -1 -1 + +1 +1 dag--task_outlet_dataset_reference - -1 -1 + +1 +1 dag--dataset_dag_run_queue - -1 -1 + +1 +1 - + dag_tag - -dag_tag - -dag_id - [VARCHAR(250)] - NOT NULL - -name - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + [VARCHAR(250)] + NOT NULL + +name + [VARCHAR(100)] + NOT NULL dag--dag_tag - -1 -1 + +1 +1 - + dag_owner_attributes - -dag_owner_attributes - -dag_id - [VARCHAR(250)] - NOT NULL - -owner - [VARCHAR(500)] - NOT NULL - -link - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + [VARCHAR(250)] + NOT NULL + +owner + [VARCHAR(500)] + NOT NULL + +link + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -1 -1 + +1 +1 - + dag_warning - -dag_warning - -dag_id - [VARCHAR(250)] - NOT NULL - -warning_type - [VARCHAR(50)] - NOT NULL - -message - [TEXT] - NOT NULL - -timestamp - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + [VARCHAR(250)] + NOT NULL + +warning_type + [VARCHAR(50)] + NOT NULL + +message + [TEXT] + NOT NULL + +timestamp + [TIMESTAMP] + NOT NULL dag--dag_warning - -1 -1 + +1 +1 - + log_template - -log_template - -id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -elasticsearch_id - [TEXT] - NOT NULL - -filename - [TEXT] - NOT NULL + +log_template + +id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +elasticsearch_id + [TEXT] + NOT NULL + +filename + [TEXT] + NOT NULL - + dag_run - -dag_run - -id - [INTEGER] - NOT NULL - -clear_number - [INTEGER] - NOT NULL - -conf - [BYTEA] - -creating_job_id - [INTEGER] - -dag_hash - [VARCHAR(32)] - -dag_id - [VARCHAR(250)] - NOT NULL - -data_interval_end - [TIMESTAMP] - -data_interval_start - [TIMESTAMP] - -end_date - [TIMESTAMP] - -execution_date - [TIMESTAMP] - NOT NULL - -external_trigger - [BOOLEAN] - -last_scheduling_decision - [TIMESTAMP] - -log_template_id - [INTEGER] - -queued_at - [TIMESTAMP] - -run_id - [VARCHAR(250)] - NOT NULL - -run_type - [VARCHAR(50)] - NOT NULL - -start_date - [TIMESTAMP] - -state - [VARCHAR(50)] - -updated_at - [TIMESTAMP] + +dag_run + +id + [INTEGER] + NOT NULL + +clear_number + [INTEGER] + NOT NULL + +conf + [BYTEA] + +creating_job_id + [INTEGER] + +dag_hash + [VARCHAR(32)] + +dag_id + [VARCHAR(250)] + NOT NULL + +data_interval_end + [TIMESTAMP] + +data_interval_start + [TIMESTAMP] + +end_date + [TIMESTAMP] + +execution_date + [TIMESTAMP] + NOT NULL + +external_trigger + [BOOLEAN] + +last_scheduling_decision + [TIMESTAMP] + +log_template_id + [INTEGER] + +queued_at + [TIMESTAMP] + +run_id + [VARCHAR(250)] + NOT NULL + +run_type + [VARCHAR(50)] + NOT NULL + +start_date + [TIMESTAMP] + +state + [VARCHAR(50)] + +updated_at + [TIMESTAMP] log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run--dag_run_note - -1 -1 + +1 +1 - + dagrun_dataset_event - -dagrun_dataset_event - -dag_run_id - [INTEGER] - NOT NULL - -event_id - [INTEGER] - NOT NULL + +dagrun_dataset_event + +dag_run_id + [INTEGER] + NOT NULL + +event_id + [INTEGER] + NOT NULL dag_run--dagrun_dataset_event - -1 -1 + +1 +1 - + task_instance - -task_instance - -dag_id - [VARCHAR(250)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -custom_operator_name - [VARCHAR(1000)] - -duration - [DOUBLE_PRECISION] - -end_date - [TIMESTAMP] - -executor - [VARCHAR(1000)] - -executor_config - [BYTEA] - -external_executor_id - [VARCHAR(250)] - -hostname - [VARCHAR(1000)] - -job_id - [INTEGER] - -max_tries - [INTEGER] - -next_kwargs - [JSON] - -next_method - [VARCHAR(1000)] - -operator - [VARCHAR(1000)] - -pid - [INTEGER] - -pool - [VARCHAR(256)] - NOT NULL - -pool_slots - [INTEGER] - NOT NULL - -priority_weight - [INTEGER] - -queue - [VARCHAR(256)] - -queued_by_job_id - [INTEGER] - -queued_dttm - [TIMESTAMP] - -rendered_map_index - [VARCHAR(250)] - -start_date - [TIMESTAMP] - -state - [VARCHAR(20)] - -task_display_name - [VARCHAR(2000)] - -trigger_id - [INTEGER] - -trigger_timeout - [TIMESTAMP] - -try_number - [INTEGER] - -unixname - [VARCHAR(1000)] - -updated_at - [TIMESTAMP] + +task_instance + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +custom_operator_name + [VARCHAR(1000)] + +duration + [DOUBLE_PRECISION] + +end_date + [TIMESTAMP] + +executor + [VARCHAR(1000)] + +executor_config + [BYTEA] + +external_executor_id + [VARCHAR(250)] + +hostname + [VARCHAR(1000)] + +job_id + [INTEGER] + +max_tries + [INTEGER] + +next_kwargs + [JSON] + +next_method + [VARCHAR(1000)] + +operator + [VARCHAR(1000)] + +pid + [INTEGER] + +pool + [VARCHAR(256)] + NOT NULL + +pool_slots + [INTEGER] + NOT NULL + +priority_weight + [INTEGER] + +queue + [VARCHAR(256)] + +queued_by_job_id + [INTEGER] + +queued_dttm + [TIMESTAMP] + +rendered_map_index + [VARCHAR(250)] + +start_date + [TIMESTAMP] + +state + [VARCHAR(20)] + +task_display_name + [VARCHAR(2000)] + +trigger_id + [INTEGER] + +trigger_timeout + [TIMESTAMP] + +try_number + [INTEGER] + +unixname + [VARCHAR(1000)] + +updated_at + [TIMESTAMP] dag_run--task_instance - -1 -1 + +1 +1 dag_run--task_instance - -1 -1 + +1 +1 - + task_reschedule - -task_reschedule - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - NOT NULL - -duration - [INTEGER] - NOT NULL - -end_date - [TIMESTAMP] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -reschedule_date - [TIMESTAMP] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -start_date - [TIMESTAMP] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -try_number - [INTEGER] - NOT NULL + +task_reschedule + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + NOT NULL + +duration + [INTEGER] + NOT NULL + +end_date + [TIMESTAMP] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +reschedule_date + [TIMESTAMP] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +start_date + [TIMESTAMP] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +try_number + [INTEGER] + NOT NULL dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_instance_note - -1 -1 + +1 +1 task_instance--task_instance_note - -1 -1 + +1 +1 task_instance--task_instance_note - -1 -1 + +1 +1 task_instance--task_instance_note - -1 -1 + +1 +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 - + rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - [VARCHAR(250)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - [JSON] - -rendered_fields - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + [JSON] + +rendered_fields + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -1 -1 + +1 +1 task_instance--rendered_task_instance_fields - -1 -1 + +1 +1 task_instance--rendered_task_instance_fields - -1 -1 + +1 +1 task_instance--rendered_task_instance_fields - -1 -1 + +1 +1 - + task_fail - -task_fail - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - NOT NULL - -duration - [INTEGER] - -end_date - [TIMESTAMP] - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -start_date - [TIMESTAMP] - -task_id - [VARCHAR(250)] - NOT NULL + +task_fail + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + NOT NULL + +duration + [INTEGER] + +end_date + [TIMESTAMP] + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +start_date + [TIMESTAMP] + +task_id + [VARCHAR(250)] + NOT NULL task_instance--task_fail - -0..N -1 + +0..N +1 task_instance--task_fail - -0..N -1 + +0..N +1 task_instance--task_fail - -0..N -1 + +0..N +1 task_instance--task_fail - -0..N -1 + +0..N +1 - + task_map - -task_map - -dag_id - [VARCHAR(250)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -keys - [JSON] - -length - [INTEGER] - NOT NULL + +task_map + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +keys + [JSON] + +length + [INTEGER] + NOT NULL task_instance--task_map - -1 -1 + +1 +1 task_instance--task_map - -1 -1 + +1 +1 task_instance--task_map - -1 -1 + +1 +1 task_instance--task_map - -1 -1 + +1 +1 - + xcom - -xcom - -dag_run_id - [INTEGER] - NOT NULL - -key - [VARCHAR(512)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -dag_id - [VARCHAR(250)] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -timestamp - [TIMESTAMP] - NOT NULL - -value - [BYTEA] + +xcom + +dag_run_id + [INTEGER] + NOT NULL + +key + [VARCHAR(512)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +dag_id + [VARCHAR(250)] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +timestamp + [TIMESTAMP] + NOT NULL + +value + [BYTEA] task_instance--xcom - -1 -1 + +1 +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -1 -1 + +1 +1 task_instance--xcom - -0..N -1 + +0..N +1 - + ab_permission - -ab_permission - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(100)] + NOT NULL - + ab_permission_view - -ab_permission_view - -id - [INTEGER] - NOT NULL - -permission_id - [INTEGER] - -view_menu_id - [INTEGER] + +ab_permission_view + +id + [INTEGER] + NOT NULL + +permission_id + [INTEGER] + +view_menu_id + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_permission_view_role - -ab_permission_view_role - -id - [INTEGER] - NOT NULL - -permission_view_id - [INTEGER] - -role_id - [INTEGER] + +ab_permission_view_role + +id + [INTEGER] + NOT NULL + +permission_view_id + [INTEGER] + +role_id + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} - + ab_view_menu - -ab_view_menu - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_role - -ab_role - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(64)] - NOT NULL + +ab_role + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(64)] + NOT NULL ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} - + dataset_event - -dataset_event - -id - [INTEGER] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -extra - [JSON] - NOT NULL - -source_dag_id - [VARCHAR(250)] - -source_map_index - [INTEGER] - -source_run_id - [VARCHAR(250)] - -source_task_id - [VARCHAR(250)] - -timestamp - [TIMESTAMP] - NOT NULL + +dataset_event + +id + [INTEGER] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +extra + [JSON] + NOT NULL + +source_dag_id + [VARCHAR(250)] + +source_map_index + [INTEGER] + +source_run_id + [VARCHAR(250)] + +source_task_id + [VARCHAR(250)] + +timestamp + [TIMESTAMP] + NOT NULL dataset_event--dagrun_dataset_event - -1 -1 + +1 +1 - + trigger - -trigger - -id - [INTEGER] - NOT NULL - -classpath - [VARCHAR(1000)] - NOT NULL - -created_date - [TIMESTAMP] - NOT NULL - -kwargs - [TEXT] - NOT NULL - -triggerer_id - [INTEGER] + +trigger + +id + [INTEGER] + NOT NULL + +classpath + [VARCHAR(1000)] + NOT NULL + +created_date + [TIMESTAMP] + NOT NULL + +kwargs + [TEXT] + NOT NULL + +triggerer_id + [INTEGER] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} - + session - -session - -id - [INTEGER] - NOT NULL - -data - [BYTEA] - -expiry - [TIMESTAMP] - -session_id - [VARCHAR(255)] + +session + +id + [INTEGER] + NOT NULL + +data + [BYTEA] + +expiry + [TIMESTAMP] + +session_id + [VARCHAR(255)] - + alembic_version - -alembic_version - -version_num - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + [VARCHAR(32)] + NOT NULL diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index eb01d6aec39ed..6ae768550b9a0 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``677fdbb7fc54`` (head) | ``686269002441`` | ``2.10.0`` | add new executor field to db. | +| ``c4602ba06b4b`` (head) | ``677fdbb7fc54`` | ``2.10.0`` | Added DagPriorityParsingRequest table. | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``677fdbb7fc54`` | ``686269002441`` | ``2.10.0`` | add new executor field to db. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``686269002441`` | ``bff083ad727d`` | ``2.9.2`` | Fix inconsistency between ORM and migration files. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ diff --git a/tests/api_connexion/endpoints/test_dag_parsing.py b/tests/api_connexion/endpoints/test_dag_parsing.py new file mode 100644 index 0000000000000..1155e1d8841ff --- /dev/null +++ b/tests/api_connexion/endpoints/test_dag_parsing.py @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from typing import TYPE_CHECKING + +import pytest +from sqlalchemy import select + +from airflow.models import DagBag +from airflow.models.dagbag import DagPriorityParsingRequest +from airflow.security import permissions +from tests.test_utils.api_connexion_utils import create_user, delete_user +from tests.test_utils.db import clear_db_dag_parsing_requests + +pytestmark = pytest.mark.db_test + +if TYPE_CHECKING: + from airflow.models.dag import DAG + +ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) +EXAMPLE_DAG_FILE = os.path.join("airflow", "example_dags", "example_bash_operator.py") +EXAMPLE_DAG_ID = "example_bash_operator" +TEST_DAG_ID = "latest_only" +NOT_READABLE_DAG_ID = "latest_only_with_trigger" +TEST_MULTIPLE_DAGS_ID = "dataset_produces_1" + + +@pytest.fixture(scope="module") +def configured_app(minimal_app_for_api): + app = minimal_app_for_api + create_user( + app, # type:ignore + username="test", + role_name="Test", + permissions=[(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)], # type: ignore + ) + app.appbuilder.sm.sync_perm_for_dag( # type: ignore + TEST_DAG_ID, + access_control={"Test": [permissions.ACTION_CAN_EDIT]}, + ) + create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore + + yield app + + delete_user(app, username="test") # type: ignore + delete_user(app, username="test_no_permissions") # type: ignore + + +class TestDagParsingRequest: + @pytest.fixture(autouse=True) + def setup_attrs(self, configured_app) -> None: + self.app = configured_app + self.client = self.app.test_client() # type:ignore + self.clear_db() + + def teardown_method(self) -> None: + self.clear_db() + + @staticmethod + def clear_db(): + clear_db_dag_parsing_requests() + + def test_201_and_400_requests(self, url_safe_serializer, session): + dagbag = DagBag(dag_folder=EXAMPLE_DAG_FILE) + dagbag.sync_to_db() + test_dag: DAG = dagbag.dags[TEST_DAG_ID] + + url = f"/api/v1/parseDagFile/{url_safe_serializer.dumps(test_dag.fileloc)}" + response = self.client.put( + url, headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"} + ) + assert 201 == response.status_code + parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all() + assert parsing_requests[0].fileloc == test_dag.fileloc + + # Duplicate file parsing request + response = self.client.put( + url, headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"} + ) + assert 201 == response.status_code + parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all() + assert parsing_requests[0].fileloc == test_dag.fileloc + + def test_bad_file_request(self, url_safe_serializer, session): + url = f"/api/v1/parseDagFile/{url_safe_serializer.dumps('/some/random/file.py')}" + response = self.client.put( + url, headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"} + ) + assert response.status_code == 404 + + parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all() + assert parsing_requests == [] + + def test_bad_user_request(self, url_safe_serializer, session): + url = f"/api/v1/parseDagFile/{url_safe_serializer.dumps('/some/random/file.py')}" + response = self.client.put( + url, + headers={"Accept": "application/json"}, + environ_overrides={"REMOTE_USER": "test_no_permissions"}, + ) + assert response.status_code == 403 + + parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all() + assert parsing_requests == [] diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index e35e2fb97f90c..0ebb6466aa895 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -576,6 +576,46 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( > (freezed_base_time - manager.processor.get_last_finish_time("file_1.py")).total_seconds() ) + @mock.patch("sqlalchemy.orm.session.Session.delete") + @mock.patch("zipfile.is_zipfile", return_value=True) + @mock.patch("airflow.utils.file.might_contain_dag", return_value=True) + @mock.patch("airflow.utils.file.find_path_from_directory", return_value=True) + @mock.patch("airflow.utils.file.os.path.isfile", return_value=True) + def test_file_paths_in_queue_sorted_by_priority( + self, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile, session_delete + ): + from airflow.models.dagbag import DagPriorityParsingRequest + + parsing_request = DagPriorityParsingRequest(fileloc="file_1.py") + with create_session() as session: + session.add(parsing_request) + session.commit() + + """Test dag files are sorted by priority""" + dag_files = ["file_3.py", "file_2.py", "file_4.py", "file_1.py"] + mock_find_path.return_value = dag_files + + manager = DagProcessorJobRunner( + job=Job(), + processor=DagFileProcessorManager( + dag_directory="directory", + max_runs=1, + processor_timeout=timedelta(days=365), + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ), + ) + + manager.processor.set_file_paths(dag_files) + manager.processor._file_path_queue = deque(["file_2.py", "file_3.py", "file_4.py", "file_1.py"]) + manager.processor._refresh_requested_filelocs() + assert manager.processor._file_path_queue == deque( + ["file_1.py", "file_2.py", "file_3.py", "file_4.py"] + ) + assert session_delete.call_args[0][0].fileloc == parsing_request.fileloc + def test_scan_stale_dags(self): """ Ensure that DAGs are marked inactive when the file is parsed but the diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py index 900598e20f296..783362b8b1bef 100644 --- a/tests/test_utils/db.py +++ b/tests/test_utils/db.py @@ -36,6 +36,7 @@ XCom, ) from airflow.models.dag import DagOwnerAttributes +from airflow.models.dagbag import DagPriorityParsingRequest from airflow.models.dagcode import DagCode from airflow.models.dagwarning import DagWarning from airflow.models.dataset import ( @@ -169,6 +170,11 @@ def clear_db_task_reschedule(): session.query(TaskReschedule).delete() +def clear_db_dag_parsing_requests(): + with create_session() as session: + session.query(DagPriorityParsingRequest).delete() + + def clear_dag_specific_permissions(): with create_session() as session: dag_resources = session.query(Resource).filter(Resource.name.like(f"{RESOURCE_DAG_PREFIX}%")).all() diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index 6854dd7d99593..6b7a5df20859f 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -344,6 +344,8 @@ def test_no_models_missing(self): "task_instance_note", # foreign keys "dag_run_note", # foreign keys "rendered_task_instance_fields", # foreign key with TI + "dag_priority_parsing_request", # Records are purged once per DAG Processing loop, not a + # significant source of data. } from airflow.utils.db_cleanup import config_dict