Skip to content

Commit

Permalink
Only orphan non-orphaned Datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
stijndehaes authored and shahar1 committed Aug 2, 2024
1 parent 5880545 commit 82832e4
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2070,6 +2070,7 @@ def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> None:
isouter=True,
)
.group_by(DatasetModel.id)
.where(~DatasetModel.is_orphaned)
.having(
and_(
func.count(DagScheduleDatasetReference.dag_id) == 0,
Expand Down
42 changes: 42 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5796,6 +5796,48 @@ def test_dataset_orphaning(self, dag_maker, session):
]
assert orphaned_datasets == ["ds2", "ds4"]

def test_dataset_orphaning_ignore_orphaned_datasets(self, dag_maker, session):
dataset1 = Dataset(uri="ds1")

with dag_maker(dag_id="datasets-1", schedule=[dataset1], session=session):
BashOperator(task_id="task", bash_command="echo 1")

non_orphaned_dataset_count = session.query(DatasetModel).filter(~DatasetModel.is_orphaned).count()
assert non_orphaned_dataset_count == 1
orphaned_dataset_count = session.query(DatasetModel).filter(DatasetModel.is_orphaned).count()
assert orphaned_dataset_count == 0

# now remove dataset1 reference
with dag_maker(dag_id="datasets-1", schedule=None, session=session):
BashOperator(task_id="task", bash_command="echo 1")

scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)

self.job_runner._orphan_unreferenced_datasets(session=session)
session.flush()

orphaned_datasets_before_rerun = (
session.query(DatasetModel.updated_at, DatasetModel.uri)
.filter(DatasetModel.is_orphaned)
.order_by(DatasetModel.uri)
)
assert [dataset.uri for dataset in orphaned_datasets_before_rerun] == ["ds1"]
updated_at_timestamps = [dataset.updated_at for dataset in orphaned_datasets_before_rerun]

# when rerunning we should ignore the already orphaned datasets and thus the updated_at timestamp
# should remain the same
self.job_runner._orphan_unreferenced_datasets(session=session)
session.flush()

orphaned_datasets_after_rerun = (
session.query(DatasetModel.updated_at, DatasetModel.uri)
.filter(DatasetModel.is_orphaned)
.order_by(DatasetModel.uri)
)
assert [dataset.uri for dataset in orphaned_datasets_after_rerun] == ["ds1"]
assert updated_at_timestamps == [dataset.updated_at for dataset in orphaned_datasets_after_rerun]

def test_misconfigured_dags_doesnt_crash_scheduler(self, session, dag_maker, caplog):
"""Test that if dagrun creation throws an exception, the scheduler doesn't crash"""

Expand Down

0 comments on commit 82832e4

Please sign in to comment.