Skip to content

Commit

Permalink
Link dataset event to dataset alias (#40723)
Browse files Browse the repository at this point in the history
* feat(dataset_alias)
    * add dataset_alias_dataset_event association table
    * link dataset_alias to triggered dataset events
    * add FK to dataset_alias_dataset_event_assocation_table
  • Loading branch information
Lee-W committed Jul 15, 2024
1 parent 3805050 commit 8ce3dc8
Show file tree
Hide file tree
Showing 9 changed files with 1,562 additions and 1,390 deletions.
8 changes: 8 additions & 0 deletions airflow/datasets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

from collections.abc import Iterable
from typing import TYPE_CHECKING

from sqlalchemy import exc, select
Expand All @@ -28,6 +29,7 @@
from airflow.listeners.listener import get_listener_manager
from airflow.models.dataset import (
DagScheduleDatasetReference,
DatasetAliasModel,
DatasetDagRunQueue,
DatasetEvent,
DatasetModel,
Expand Down Expand Up @@ -73,6 +75,7 @@ def register_dataset_change(
dataset: Dataset,
extra=None,
session: Session = NEW_SESSION,
source_alias_names: Iterable[str] | None = None,
**kwargs,
) -> DatasetEvent | None:
"""
Expand Down Expand Up @@ -105,6 +108,11 @@ def register_dataset_change(
}
)
dataset_event = DatasetEvent(**event_kwargs)
if source_alias_names:
dataset_alias_models = session.scalars(
select(DatasetAliasModel).where(DatasetAliasModel.name.in_(source_alias_names))
)
dataset_event.source_aliases.extend(dataset_alias_models)
session.add(dataset_event)
session.flush()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add dataset_alias_dataset_event.
Revision ID: ec3471c1e067
Revises: 05e19f3176be
Create Date: 2024-07-11 09:42:00.643179
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "ec3471c1e067"
down_revision = "05e19f3176be"
branch_labels = None
depends_on = None
airflow_version = "2.10.0"


def upgrade():
"""Add dataset_alias_dataset_event table."""
op.create_table(
"dataset_alias_dataset_event",
sa.Column("alias_id", sa.Integer(), nullable=False),
sa.Column("event_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["alias_id"],
["dataset_alias.id"],
name=op.f("dataset_alias_dataset_event_alias_id_fkey"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["event_id"],
["dataset_event.id"],
name=op.f("dataset_alias_dataset_event_event_id_fkey"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("alias_id", "event_id", name=op.f("dataset_alias_dataset_event_pkey")),
)
with op.batch_alter_table("dataset_alias_dataset_event", schema=None) as batch_op:
batch_op.create_index("idx_dataset_alias_dataset_event_alias_id", ["alias_id"], unique=False)
batch_op.create_index("idx_dataset_alias_dataset_event_event_id", ["event_id"], unique=False)


def downgrade():
"""Drop dataset_alias_dataset_event table."""
op.drop_table("dataset_alias_dataset_event")
27 changes: 27 additions & 0 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,27 @@ def __repr__(self):
Index("idx_dagrun_dataset_events_event_id", "event_id"),
)

dataset_alias_dataset_event_assocation_table = Table(
"dataset_alias_dataset_event",
Base.metadata,
Column("alias_id", ForeignKey("dataset_alias.id", ondelete="CASCADE"), primary_key=True),
Column("event_id", ForeignKey("dataset_event.id", ondelete="CASCADE"), primary_key=True),
Index("idx_dataset_alias_dataset_event_alias_id", "alias_id"),
Index("idx_dataset_alias_dataset_event_event_id", "event_id"),
ForeignKeyConstraint(
("alias_id",),
["dataset_alias.id"],
name="dss_de_alias_id",
ondelete="CASCADE",
),
ForeignKeyConstraint(
("event_id",),
["dataset_event.id"],
name="dss_de_event_id",
ondelete="CASCADE",
),
)


class DatasetEvent(Base):
"""
Expand Down Expand Up @@ -322,6 +343,12 @@ class DatasetEvent(Base):
backref="consumed_dataset_events",
)

source_aliases = relationship(
"DatasetAliasModel",
secondary=dataset_alias_dataset_event_assocation_table,
backref="dataset_events",
)

source_task_instance = relationship(
"TaskInstance",
primaryjoin="""and_(
Expand Down
9 changes: 5 additions & 4 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2915,7 +2915,7 @@ def _register_dataset_changes(self, *, events: OutletEventAccessors, session: Se
# One task only triggers one dataset event for each dataset with the same extra.
# This tuple[dataset uri, extra] to sets alias names mapping is used to find whether
# there're datasets with same uri but different extra that we need to emit more than one dataset events.
dataset_tuple_to_aliases_mapping: dict[tuple[str, frozenset], set[str]] = defaultdict(set)
dataset_tuple_to_alias_names_mapping: dict[tuple[str, frozenset], set[str]] = defaultdict(set)
for obj in self.task.outlets or []:
self.log.debug("outlet obj %s", obj)
# Lineage can have other types of objects besides datasets
Expand All @@ -2933,10 +2933,10 @@ def _register_dataset_changes(self, *, events: OutletEventAccessors, session: Se
frozen_extra = frozenset(extra.items())
dataset_alias_name = dataset_alias_event["source_alias_name"]

dataset_tuple_to_aliases_mapping[(dataset_uri, frozen_extra)].add(dataset_alias_name)
dataset_tuple_to_alias_names_mapping[(dataset_uri, frozen_extra)].add(dataset_alias_name)

dataset_objs_cache: dict[str, DatasetModel] = {}
for (uri, extra_items), aliases in dataset_tuple_to_aliases_mapping.items():
for (uri, extra_items), alias_names in dataset_tuple_to_alias_names_mapping.items():
if uri not in dataset_objs_cache:
dataset_obj = session.scalar(select(DatasetModel).where(DatasetModel.uri == uri).limit(1))
dataset_objs_cache[uri] = dataset_obj
Expand All @@ -2954,13 +2954,14 @@ def _register_dataset_changes(self, *, events: OutletEventAccessors, session: Se
'Create dataset event Dataset(uri="%s", extra="%s") through dataset aliases "%s"',
uri,
extra,
", ".join(aliases),
", ".join(alias_names),
)
dataset_manager.register_dataset_change(
task_instance=self,
dataset=dataset_obj,
extra=extra,
session=session,
source_alias_names=alias_names,
)

def _execute_task_with_callbacks(self, context: Context, test_mode: bool = False, *, session: Session):
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class MappedClassProtocol(Protocol):
"2.8.1": "88344c1d9134",
"2.9.0": "1949afb29106",
"2.9.2": "686269002441",
"2.10.0": "05e19f3176be",
"2.10.0": "ec3471c1e067",
}


Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
a349dbd245d5e7ddf7a3b0dc4d50bc9c779d26affc9a3332d0e0e8953a454980
88c9ce0742c54f376a3c600600d06ac9a6f80a3a2cb85dfcf2472d1c77ed75db
Loading

0 comments on commit 8ce3dc8

Please sign in to comment.