From 8ce3dc8684b4fdccb9a7683c2a6c940a5cf37cfc Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 15 Jul 2024 15:08:38 +0800 Subject: [PATCH] Link dataset event to dataset alias (#40723) * feat(dataset_alias) * add dataset_alias_dataset_event association table * link dataset_alias to triggered dataset events * add FK to dataset_alias_dataset_event_assocation_table --- airflow/datasets/manager.py | 8 + ...0148_2_10_0_dataset_alias_dataset_event.py | 68 + airflow/models/dataset.py | 27 + airflow/models/taskinstance.py | 9 +- airflow/utils/db.py | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 2760 +++++++++-------- docs/apache-airflow/migrations-ref.rst | 4 +- tests/models/test_taskinstance.py | 72 +- 9 files changed, 1562 insertions(+), 1390 deletions(-) create mode 100644 airflow/migrations/versions/0148_2_10_0_dataset_alias_dataset_event.py diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py index 2861b581718f..ac4c7001c9ab 100644 --- a/airflow/datasets/manager.py +++ b/airflow/datasets/manager.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +from collections.abc import Iterable from typing import TYPE_CHECKING from sqlalchemy import exc, select @@ -28,6 +29,7 @@ from airflow.listeners.listener import get_listener_manager from airflow.models.dataset import ( DagScheduleDatasetReference, + DatasetAliasModel, DatasetDagRunQueue, DatasetEvent, DatasetModel, @@ -73,6 +75,7 @@ def register_dataset_change( dataset: Dataset, extra=None, session: Session = NEW_SESSION, + source_alias_names: Iterable[str] | None = None, **kwargs, ) -> DatasetEvent | None: """ @@ -105,6 +108,11 @@ def register_dataset_change( } ) dataset_event = DatasetEvent(**event_kwargs) + if source_alias_names: + dataset_alias_models = session.scalars( + select(DatasetAliasModel).where(DatasetAliasModel.name.in_(source_alias_names)) + ) + dataset_event.source_aliases.extend(dataset_alias_models) session.add(dataset_event) session.flush() diff --git a/airflow/migrations/versions/0148_2_10_0_dataset_alias_dataset_event.py b/airflow/migrations/versions/0148_2_10_0_dataset_alias_dataset_event.py new file mode 100644 index 000000000000..15f70dab71ac --- /dev/null +++ b/airflow/migrations/versions/0148_2_10_0_dataset_alias_dataset_event.py @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add dataset_alias_dataset_event. + +Revision ID: ec3471c1e067 +Revises: 05e19f3176be +Create Date: 2024-07-11 09:42:00.643179 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "ec3471c1e067" +down_revision = "05e19f3176be" +branch_labels = None +depends_on = None +airflow_version = "2.10.0" + + +def upgrade(): + """Add dataset_alias_dataset_event table.""" + op.create_table( + "dataset_alias_dataset_event", + sa.Column("alias_id", sa.Integer(), nullable=False), + sa.Column("event_id", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["alias_id"], + ["dataset_alias.id"], + name=op.f("dataset_alias_dataset_event_alias_id_fkey"), + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["event_id"], + ["dataset_event.id"], + name=op.f("dataset_alias_dataset_event_event_id_fkey"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("alias_id", "event_id", name=op.f("dataset_alias_dataset_event_pkey")), + ) + with op.batch_alter_table("dataset_alias_dataset_event", schema=None) as batch_op: + batch_op.create_index("idx_dataset_alias_dataset_event_alias_id", ["alias_id"], unique=False) + batch_op.create_index("idx_dataset_alias_dataset_event_event_id", ["event_id"], unique=False) + + +def downgrade(): + """Drop dataset_alias_dataset_event table.""" + op.drop_table("dataset_alias_dataset_event") diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 0ac8ce75e1c3..0825eeea51c0 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -284,6 +284,27 @@ def __repr__(self): Index("idx_dagrun_dataset_events_event_id", "event_id"), ) +dataset_alias_dataset_event_assocation_table = Table( + "dataset_alias_dataset_event", + Base.metadata, + Column("alias_id", ForeignKey("dataset_alias.id", ondelete="CASCADE"), primary_key=True), + Column("event_id", ForeignKey("dataset_event.id", ondelete="CASCADE"), primary_key=True), + Index("idx_dataset_alias_dataset_event_alias_id", "alias_id"), + Index("idx_dataset_alias_dataset_event_event_id", "event_id"), + ForeignKeyConstraint( + ("alias_id",), + ["dataset_alias.id"], + name="dss_de_alias_id", + ondelete="CASCADE", + ), + ForeignKeyConstraint( + ("event_id",), + ["dataset_event.id"], + name="dss_de_event_id", + ondelete="CASCADE", + ), +) + class DatasetEvent(Base): """ @@ -322,6 +343,12 @@ class DatasetEvent(Base): backref="consumed_dataset_events", ) + source_aliases = relationship( + "DatasetAliasModel", + secondary=dataset_alias_dataset_event_assocation_table, + backref="dataset_events", + ) + source_task_instance = relationship( "TaskInstance", primaryjoin="""and_( diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index e63fc07d1d9b..80b3eedbc843 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2915,7 +2915,7 @@ def _register_dataset_changes(self, *, events: OutletEventAccessors, session: Se # One task only triggers one dataset event for each dataset with the same extra. # This tuple[dataset uri, extra] to sets alias names mapping is used to find whether # there're datasets with same uri but different extra that we need to emit more than one dataset events. - dataset_tuple_to_aliases_mapping: dict[tuple[str, frozenset], set[str]] = defaultdict(set) + dataset_tuple_to_alias_names_mapping: dict[tuple[str, frozenset], set[str]] = defaultdict(set) for obj in self.task.outlets or []: self.log.debug("outlet obj %s", obj) # Lineage can have other types of objects besides datasets @@ -2933,10 +2933,10 @@ def _register_dataset_changes(self, *, events: OutletEventAccessors, session: Se frozen_extra = frozenset(extra.items()) dataset_alias_name = dataset_alias_event["source_alias_name"] - dataset_tuple_to_aliases_mapping[(dataset_uri, frozen_extra)].add(dataset_alias_name) + dataset_tuple_to_alias_names_mapping[(dataset_uri, frozen_extra)].add(dataset_alias_name) dataset_objs_cache: dict[str, DatasetModel] = {} - for (uri, extra_items), aliases in dataset_tuple_to_aliases_mapping.items(): + for (uri, extra_items), alias_names in dataset_tuple_to_alias_names_mapping.items(): if uri not in dataset_objs_cache: dataset_obj = session.scalar(select(DatasetModel).where(DatasetModel.uri == uri).limit(1)) dataset_objs_cache[uri] = dataset_obj @@ -2954,13 +2954,14 @@ def _register_dataset_changes(self, *, events: OutletEventAccessors, session: Se 'Create dataset event Dataset(uri="%s", extra="%s") through dataset aliases "%s"', uri, extra, - ", ".join(aliases), + ", ".join(alias_names), ) dataset_manager.register_dataset_change( task_instance=self, dataset=dataset_obj, extra=extra, session=session, + source_alias_names=alias_names, ) def _execute_task_with_callbacks(self, context: Context, test_mode: bool = False, *, session: Session): diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 427a19d80c31..463b6894f522 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -118,7 +118,7 @@ class MappedClassProtocol(Protocol): "2.8.1": "88344c1d9134", "2.9.0": "1949afb29106", "2.9.2": "686269002441", - "2.10.0": "05e19f3176be", + "2.10.0": "ec3471c1e067", } diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index fd03ab1b62ab..a3624746f5ec 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -a349dbd245d5e7ddf7a3b0dc4d50bc9c779d26affc9a3332d0e0e8953a454980 \ No newline at end of file +88c9ce0742c54f376a3c600600d06ac9a6f80a3a2cb85dfcf2472d1c77ed75db \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index b010c3f1317b..0328672c20a8 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,363 +4,347 @@ - - + + %3 - - - -dataset_alias - -dataset_alias - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(3000)] - NOT NULL - + - + job - -job - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -end_date - - [TIMESTAMP] - -executor_class - - [VARCHAR(500)] - -hostname - - [VARCHAR(500)] - -job_type - - [VARCHAR(30)] - -latest_heartbeat - - [TIMESTAMP] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -unixname - - [VARCHAR(1000)] + +job + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +end_date + + [TIMESTAMP] + +executor_class + + [VARCHAR(500)] + +hostname + + [VARCHAR(500)] + +job_type + + [VARCHAR(30)] + +latest_heartbeat + + [TIMESTAMP] + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +unixname + + [VARCHAR(1000)] - + slot_pool - -slot_pool - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -include_deferred - - [BOOLEAN] - NOT NULL - -pool - - [VARCHAR(256)] - -slots - - [INTEGER] + +slot_pool + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +include_deferred + + [BOOLEAN] + NOT NULL + +pool + + [VARCHAR(256)] + +slots + + [INTEGER] - + dag_priority_parsing_request - -dag_priority_parsing_request - -id - - [VARCHAR(32)] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL + +dag_priority_parsing_request + +id + + [VARCHAR(32)] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL - + log - -log - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -dttm - - [TIMESTAMP] - -event - - [VARCHAR(60)] - -execution_date - - [TIMESTAMP] - -extra - - [TEXT] - -map_index - - [INTEGER] - -owner - - [VARCHAR(500)] - -owner_display_name - - [VARCHAR(500)] - -run_id - - [VARCHAR(250)] - -task_id - - [VARCHAR(250)] + +log + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +dttm + + [TIMESTAMP] + +event + + [VARCHAR(60)] + +execution_date + + [TIMESTAMP] + +extra + + [TEXT] + +map_index + + [INTEGER] + +owner + + [VARCHAR(500)] + +owner_display_name + + [VARCHAR(500)] + +run_id + + [VARCHAR(250)] + +task_id + + [VARCHAR(250)] - + dag_code - -dag_code - -fileloc_hash - - [BIGINT] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -source_code - - [TEXT] - NOT NULL + +dag_code + +fileloc_hash + + [BIGINT] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +source_code + + [TEXT] + NOT NULL - + dag_pickle - -dag_pickle - -id - - [INTEGER] - NOT NULL - -created_dttm - - [TIMESTAMP] - -pickle - - [BYTEA] - -pickle_hash - - [BIGINT] + +dag_pickle + +id + + [INTEGER] + NOT NULL + +created_dttm + + [TIMESTAMP] + +pickle + + [BYTEA] + +pickle_hash + + [BIGINT] - + ab_user - -ab_user - -id - - [INTEGER] - NOT NULL - -active - - [BOOLEAN] - -changed_by_fk - - [INTEGER] - -changed_on - - [TIMESTAMP] - -created_by_fk - - [INTEGER] - -created_on - - [TIMESTAMP] - -email - - [VARCHAR(512)] - NOT NULL - -fail_login_count - - [INTEGER] - -first_name - - [VARCHAR(256)] - NOT NULL - -last_login - - [TIMESTAMP] - -last_name - - [VARCHAR(256)] - NOT NULL - -login_count - - [INTEGER] - -password - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_user + +id + + [INTEGER] + NOT NULL + +active + + [BOOLEAN] + +changed_by_fk + + [INTEGER] + +changed_on + + [TIMESTAMP] + +created_by_fk + + [INTEGER] + +created_on + + [TIMESTAMP] + +email + + [VARCHAR(512)] + NOT NULL + +fail_login_count + + [INTEGER] + +first_name + + [VARCHAR(256)] + NOT NULL + +last_login + + [TIMESTAMP] + +last_name + + [VARCHAR(256)] + NOT NULL + +login_count + + [INTEGER] + +password + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} - + ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} - + dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [INTEGER] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [INTEGER] ab_user--dag_run_note - -0..N -{0,1} + +0..N +{0,1} - + task_instance_note task_instance_note @@ -406,794 +390,903 @@ ab_user--task_instance_note - -0..N -{0,1} + +0..N +{0,1} - + ab_register_user - -ab_register_user - -id - - [INTEGER] - NOT NULL - -email - - [VARCHAR(512)] - NOT NULL - -first_name - - [VARCHAR(256)] - NOT NULL - -last_name - - [VARCHAR(256)] - NOT NULL - -password - - [VARCHAR(256)] - -registration_date - - [TIMESTAMP] - -registration_hash - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_register_user + +id + + [INTEGER] + NOT NULL + +email + + [VARCHAR(512)] + NOT NULL + +first_name + + [VARCHAR(256)] + NOT NULL + +last_name + + [VARCHAR(256)] + NOT NULL + +password + + [VARCHAR(256)] + +registration_date + + [TIMESTAMP] + +registration_hash + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL - + connection - -connection - -id - - [INTEGER] - NOT NULL - -conn_id - - [VARCHAR(250)] - NOT NULL - -conn_type - - [VARCHAR(500)] - NOT NULL - -description - - [TEXT] - -extra - - [TEXT] - -host - - [VARCHAR(500)] - -is_encrypted - - [BOOLEAN] - -is_extra_encrypted - - [BOOLEAN] - -login - - [TEXT] - -password - - [TEXT] - -port - - [INTEGER] - -schema - - [VARCHAR(500)] + +connection + +id + + [INTEGER] + NOT NULL + +conn_id + + [VARCHAR(250)] + NOT NULL + +conn_type + + [VARCHAR(500)] + NOT NULL + +description + + [TEXT] + +extra + + [TEXT] + +host + + [VARCHAR(500)] + +is_encrypted + + [BOOLEAN] + +is_extra_encrypted + + [BOOLEAN] + +login + + [TEXT] + +password + + [TEXT] + +port + + [INTEGER] + +schema + + [VARCHAR(500)] - + callback_request - -callback_request - -id - - [INTEGER] - NOT NULL - -callback_data - - [JSON] - NOT NULL - -callback_type - - [VARCHAR(20)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -priority_weight - - [INTEGER] - NOT NULL - -processor_subdir - - [VARCHAR(2000)] + +callback_request + +id + + [INTEGER] + NOT NULL + +callback_data + + [JSON] + NOT NULL + +callback_type + + [VARCHAR(20)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +priority_weight + + [INTEGER] + NOT NULL + +processor_subdir + + [VARCHAR(2000)] - + sla_miss - -sla_miss - -dag_id - - [VARCHAR(250)] - NOT NULL - -execution_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -description - - [TEXT] - -email_sent - - [BOOLEAN] - -notification_sent - - [BOOLEAN] - -timestamp - - [TIMESTAMP] + +sla_miss + +dag_id + + [VARCHAR(250)] + NOT NULL + +execution_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +description + + [TEXT] + +email_sent + + [BOOLEAN] + +notification_sent + + [BOOLEAN] + +timestamp + + [TIMESTAMP] - + variable - -variable - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -is_encrypted - - [BOOLEAN] - -key - - [VARCHAR(250)] - -val - - [TEXT] + +variable + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +is_encrypted + + [BOOLEAN] + +key + + [VARCHAR(250)] + +val + + [TEXT] - + import_error - -import_error - -id - - [INTEGER] - NOT NULL - -filename - - [VARCHAR(1024)] - -processor_subdir - - [VARCHAR(2000)] - -stacktrace - - [TEXT] - -timestamp - - [TIMESTAMP] + +import_error + +id + + [INTEGER] + NOT NULL + +filename + + [VARCHAR(1024)] + +processor_subdir + + [VARCHAR(2000)] + +stacktrace + + [TEXT] + +timestamp + + [TIMESTAMP] - + serialized_dag - -serialized_dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -data - - [JSON] - -data_compressed - - [BYTEA] - -fileloc - - [VARCHAR(2000)] - NOT NULL - -fileloc_hash - - [BIGINT] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -processor_subdir - - [VARCHAR(2000)] + +serialized_dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +data + + [JSON] + +data_compressed + + [BYTEA] + +fileloc + + [VARCHAR(2000)] + NOT NULL + +fileloc_hash + + [BIGINT] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +processor_subdir + + [VARCHAR(2000)] - + dataset - -dataset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -is_orphaned - - [BOOLEAN] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(3000)] - NOT NULL + +dataset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +is_orphaned + + [BOOLEAN] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(3000)] + NOT NULL - + dag_schedule_dataset_reference - -dag_schedule_dataset_reference - -dag_id - - [VARCHAR(250)] - NOT NULL - -dataset_id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_dataset_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +dataset_id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL dataset--dag_schedule_dataset_reference - -0..N -1 + +0..N +1 - + task_outlet_dataset_reference - -task_outlet_dataset_reference - -dag_id - - [VARCHAR(250)] - NOT NULL - -dataset_id - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_dataset_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +dataset_id + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL dataset--task_outlet_dataset_reference - -0..N -1 + +0..N +1 - + dataset_dag_run_queue - -dataset_dag_run_queue - -dataset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +dataset_dag_run_queue + +dataset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL dataset--dataset_dag_run_queue - -0..N -1 + +0..N +1 - + dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_display_name - - [VARCHAR(2000)] - -dataset_expression - - [JSON] - -default_view - - [VARCHAR(25)] - -description - - [TEXT] - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_active - - [BOOLEAN] - -is_paused - - [BOOLEAN] - -is_subdag - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parsed_time - - [TIMESTAMP] - -last_pickled - - [TIMESTAMP] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -pickle_id - - [INTEGER] - -processor_subdir - - [VARCHAR(2000)] - -root_dag_id - - [VARCHAR(250)] - -schedule_interval - - [TEXT] - -scheduler_lock - - [BOOLEAN] - -timetable_description - - [VARCHAR(1000)] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_display_name + + [VARCHAR(2000)] + +dataset_expression + + [JSON] + +default_view + + [VARCHAR(25)] + +description + + [TEXT] + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_active + + [BOOLEAN] + +is_paused + + [BOOLEAN] + +is_subdag + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parsed_time + + [TIMESTAMP] + +last_pickled + + [TIMESTAMP] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +pickle_id + + [INTEGER] + +processor_subdir + + [VARCHAR(2000)] + +root_dag_id + + [VARCHAR(250)] + +schedule_interval + + [TEXT] + +scheduler_lock + + [BOOLEAN] + +timetable_description + + [VARCHAR(1000)] dag--dag_schedule_dataset_reference - -0..N -1 + +0..N +1 dag--task_outlet_dataset_reference - -0..N -1 + +0..N +1 dag--dataset_dag_run_queue - -0..N -1 + +0..N +1 - + dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -1 + +0..N +1 - + dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -1 + +0..N +1 - + dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -1 + +0..N +1 - + + +dataset_alias + +dataset_alias + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(3000)] + NOT NULL + + +dataset_alias_dataset_event + +dataset_alias_dataset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL + + + +dataset_alias--dataset_alias_dataset_event + +0..N +1 + + + +dataset_alias--dataset_alias_dataset_event + +0..N +1 + + + +dataset_event + +dataset_event + +id + + [INTEGER] + NOT NULL + +dataset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL + + + +dataset_event--dataset_alias_dataset_event + +0..N +1 + + + +dataset_event--dataset_alias_dataset_event + +0..N +1 + + + +dagrun_dataset_event + +dagrun_dataset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL + + + +dataset_event--dagrun_dataset_event + +0..N +1 + + + log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL - + dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -clear_number - - [INTEGER] - NOT NULL - -conf - - [BYTEA] - -creating_job_id - - [INTEGER] - -dag_hash - - [VARCHAR(32)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -execution_date - - [TIMESTAMP] - NOT NULL - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +clear_number + + [INTEGER] + NOT NULL + +conf + + [BYTEA] + +creating_job_id + + [INTEGER] + +dag_hash + + [VARCHAR(32)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +execution_date + + [TIMESTAMP] + NOT NULL + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] - + log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} - + dag_run--dag_run_note - -1 -1 - - - -dagrun_dataset_event - -dagrun_dataset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +1 +1 - + dag_run--dagrun_dataset_event - -0..N -1 + +0..N +1 - + task_instance task_instance @@ -1333,210 +1426,210 @@ [TIMESTAMP] - + dag_run--task_instance - -0..N -1 + +0..N +1 - + dag_run--task_instance - -0..N -1 + +0..N +1 - + task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL - + dag_run--task_reschedule - -0..N -1 + +0..N +1 - + dag_run--task_reschedule - -0..N -1 + +0..N +1 - + task_instance--task_instance_note 0..N 1 - + task_instance--task_instance_note 0..N 1 - + task_instance--task_instance_note 0..N 1 - + task_instance--task_instance_note 0..N 1 - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL - + task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 - + task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 - + task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 - + task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 - + task_fail task_fail @@ -1579,35 +1672,35 @@ NOT NULL - + task_instance--task_fail 0..N 1 - + task_instance--task_fail 0..N 1 - + task_instance--task_fail 0..N 1 - + task_instance--task_fail 0..N 1 - + task_map task_map @@ -1642,35 +1735,35 @@ NOT NULL - + task_instance--task_map 0..N 1 - + task_instance--task_map 0..N 1 - + task_instance--task_map 0..N 1 - + task_instance--task_map 0..N 1 - + xcom xcom @@ -1715,35 +1808,35 @@ [BYTEA] - + task_instance--xcom 0..N 1 - + task_instance--xcom 0..N 1 - + task_instance--xcom 0..N 1 - + task_instance--xcom 0..N 1 - + task_instance_history task_instance_history @@ -1889,273 +1982,224 @@ [TIMESTAMP] - + task_instance--task_instance_history 0..N 1 - + task_instance--task_instance_history 0..N 1 - + task_instance--task_instance_history 0..N 1 - + task_instance--task_instance_history 0..N 1 - + ab_permission - -ab_permission - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL - + ab_permission_view - -ab_permission_view - -id - - [INTEGER] - NOT NULL - -permission_id - - [INTEGER] - -view_menu_id - - [INTEGER] + +ab_permission_view + +id + + [INTEGER] + NOT NULL + +permission_id + + [INTEGER] + +view_menu_id + + [INTEGER] - + ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] - + ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} - + ab_view_menu - -ab_view_menu - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(250)] + NOT NULL - + ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_role - -ab_role - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(64)] - NOT NULL + +ab_role + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(64)] + NOT NULL - + ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} - + ab_role--ab_permission_view_role - -0..N -{0,1} - - - -dataset_event - -dataset_event - -id - - [INTEGER] - NOT NULL - -dataset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL - - - -dataset_event--dagrun_dataset_event - -0..N -1 + +0..N +{0,1} - + trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] - + trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} - + session - -session - -id - - [INTEGER] - NOT NULL - -data - - [BYTEA] - -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] + +session + +id + + [INTEGER] + NOT NULL + +data + + [BYTEA] + +expiry + + [TIMESTAMP] + +session_id + + [VARCHAR(255)] - + alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 4f0758fba125..95fe3dc59140 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``05e19f3176be`` (head) | ``d482b7261ff9`` | ``2.10.0`` | Add dataset_alias. | +| ``ec3471c1e067`` (head) | ``05e19f3176be`` | ``2.10.0`` | Add dataset_alias_dataset_event. | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``05e19f3176be`` | ``d482b7261ff9`` | ``2.10.0`` | Add dataset_alias. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``d482b7261ff9`` | ``c4602ba06b4b`` | ``2.10.0`` | Add task_instance_history. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 2e14371598b3..577c2a733002 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2458,17 +2458,18 @@ def _write2_post_execute(context, result): def test_outlet_dataset_alias(self, dag_maker, session): from airflow.datasets import Dataset, DatasetAlias - ds1 = DatasetModel(id=1, uri="test_outlet_dataset_alias_test_case_ds") + ds_uri = "test_outlet_dataset_alias_test_case_ds" + dsa_name_1 = "test_outlet_dataset_alias_test_case_dsa_1" + + ds1 = DatasetModel(id=1, uri=ds_uri) session.add(ds1) session.commit() with dag_maker(dag_id="producer_dag", schedule=None, session=session) as dag: - @task(outlets=DatasetAlias("test_outlet_dataset_alias_test_case_dsa")) + @task(outlets=DatasetAlias(dsa_name_1)) def producer(*, outlet_events): - outlet_events["test_outlet_dataset_alias_test_case_dsa"].add( - Dataset("test_outlet_dataset_alias_test_case_ds") - ) + outlet_events[dsa_name_1].add(Dataset(ds_uri)) producer() @@ -2489,22 +2490,30 @@ def producer(*, outlet_events): assert producer_event.source_dag_id == "producer_dag" assert producer_event.source_run_id == "test" assert producer_event.source_map_index == -1 - assert producer_event.dataset.uri == "test_outlet_dataset_alias_test_case_ds" + assert producer_event.dataset.uri == ds_uri + assert len(producer_event.source_aliases) == 1 assert producer_event.extra == {} + assert producer_event.source_aliases[0].name == dsa_name_1 def test_outlet_multiple_dataset_alias(self, dag_maker, session): from airflow.datasets import Dataset, DatasetAlias - ds1 = DatasetModel(id=1, uri="test_outlet_mdsa_ds") + ds_uri = "test_outlet_mdsa_ds" + dsa_name_1 = "test_outlet_mdsa_dsa_1" + dsa_name_2 = "test_outlet_mdsa_dsa_2" + dsa_name_3 = "test_outlet_mdsa_dsa_3" + + ds1 = DatasetModel(id=1, uri=ds_uri) session.add(ds1) session.commit() with dag_maker(dag_id="producer_dag", schedule=None, session=session) as dag: - @task(outlets=[DatasetAlias("test_outlet_mdsa_dsa_1"), DatasetAlias("test_outlet_mdsa_dsa_2")]) + @task(outlets=[DatasetAlias(dsa_name_1), DatasetAlias(dsa_name_2), DatasetAlias(dsa_name_3)]) def producer(*, outlet_events): - outlet_events["test_outlet_mdsa_dsa_1"].add(Dataset("test_outlet_mdsa_ds")) - outlet_events["test_outlet_mdsa_dsa_2"].add(Dataset("test_outlet_mdsa_ds")) + outlet_events[dsa_name_1].add(Dataset(ds_uri)) + outlet_events[dsa_name_2].add(Dataset(ds_uri)) + outlet_events[dsa_name_3].add(Dataset(ds_uri), extra={"k": "v"}) producer() @@ -2518,31 +2527,41 @@ def producer(*, outlet_events): select(DatasetEvent).where(DatasetEvent.source_task_id == "producer") ).fetchall() - assert len(producer_events) == 1 - - producer_event = producer_events[0][0] - assert producer_event.source_task_id == "producer" - assert producer_event.source_dag_id == "producer_dag" - assert producer_event.source_run_id == "test" - assert producer_event.source_map_index == -1 - assert producer_event.dataset.uri == "test_outlet_mdsa_ds" - assert producer_event.extra == {} + assert len(producer_events) == 2 + for row in producer_events: + producer_event = row[0] + assert producer_event.source_task_id == "producer" + assert producer_event.source_dag_id == "producer_dag" + assert producer_event.source_run_id == "test" + assert producer_event.source_map_index == -1 + assert producer_event.dataset.uri == ds_uri + + if not producer_event.extra: + assert producer_event.extra == {} + assert len(producer_event.source_aliases) == 2 + assert {alias.name for alias in producer_event.source_aliases} == {dsa_name_1, dsa_name_2} + else: + assert producer_event.extra == {"k": "v"} + assert len(producer_event.source_aliases) == 1 + assert producer_event.source_aliases[0].name == dsa_name_3 def test_outlet_dataset_alias_through_metadata(self, dag_maker, session): from airflow.datasets import DatasetAlias from airflow.datasets.metadata import Metadata + dsa_name = "test_outlet_dataset_alias_through_metadata_dsa" + ds1 = DatasetModel(id=1, uri="test_outlet_dataset_alias_through_metadata_ds") session.add(ds1) session.commit() with dag_maker(dag_id="producer_dag", schedule=None, session=session) as dag: - @task(outlets=DatasetAlias("test_outlet_dataset_alias_through_metadata_dsa")) + @task(outlets=DatasetAlias(dsa_name)) def producer(*, outlet_events): yield Metadata( "test_outlet_dataset_alias_through_metadata_ds", extra={"key": "value"}, - alias="test_outlet_dataset_alias_through_metadata_dsa", + alias=dsa_name, ) producer() @@ -2563,17 +2582,18 @@ def producer(*, outlet_events): assert producer_event.source_map_index == -1 assert producer_event.dataset.uri == "test_outlet_dataset_alias_through_metadata_ds" assert producer_event.extra == {"key": "value"} + assert len(producer_event.source_aliases) == 1 + assert producer_event.source_aliases[0].name == dsa_name def test_outlet_dataset_alias_dataset_not_exists(self, dag_maker, session): from airflow.datasets import Dataset, DatasetAlias + dsa_name = "test_outlet_dataset_alias_dataset_not_exists_dsa" with dag_maker(dag_id="producer_dag", schedule=None, session=session) as dag: - @task(outlets=DatasetAlias("test_outlet_dataset_alias_dataset_not_exists_dsa")) + @task(outlets=DatasetAlias(dsa_name)) def producer(*, outlet_events): - outlet_events["test_outlet_dataset_alias_dataset_not_exists_dsa"].add( - Dataset("did_not_exists"), extra={"key": "value"} - ) + outlet_events[dsa_name].add(Dataset("did_not_exists"), extra={"key": "value"}) producer() @@ -2593,6 +2613,8 @@ def producer(*, outlet_events): assert producer_event.source_map_index == -1 assert producer_event.dataset.uri == "did_not_exists" assert producer_event.extra == {"key": "value"} + assert len(producer_event.source_aliases) == 1 + assert producer_event.source_aliases[0].name == dsa_name def test_inlet_dataset_extra(self, dag_maker, session): from airflow.datasets import Dataset