-
Notifications
You must be signed in to change notification settings - Fork 14.4k
/
dag.py
4088 lines (3569 loc) · 162 KB
/
dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import asyncio
import collections
import copy
import functools
import itertools
import logging
import os
import pathlib
import pickle
import sys
import time
import traceback
import warnings
import weakref
from collections import deque
from contextlib import ExitStack
from datetime import datetime, timedelta
from inspect import signature
from typing import (
TYPE_CHECKING,
Any,
Callable,
Collection,
Container,
Iterable,
Iterator,
List,
Pattern,
Sequence,
Union,
cast,
overload,
)
from urllib.parse import urlsplit
import jinja2
import pendulum
import re2
from dateutil.relativedelta import relativedelta
from sqlalchemy import (
Boolean,
Column,
ForeignKey,
Index,
Integer,
String,
Text,
and_,
case,
func,
not_,
or_,
select,
update,
)
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import backref, joinedload, relationship
from sqlalchemy.sql import Select, expression
import airflow.templates
from airflow import settings, utils
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import conf as airflow_conf, secrets_backend_list
from airflow.datasets.manager import dataset_manager
from airflow.exceptions import (
AirflowDagInconsistent,
AirflowException,
DuplicateTaskIdFound,
FailStopDagInvalidTriggerRule,
ParamValidationError,
RemovedInAirflow3Warning,
TaskDeferred,
TaskNotFound,
)
from airflow.jobs.job import run_job
from airflow.models.abstractoperator import AbstractOperator, TaskStateChangeCallback
from airflow.models.base import Base, StringID
from airflow.models.baseoperator import BaseOperator
from airflow.models.dagcode import DagCode
from airflow.models.dagpickle import DagPickle
from airflow.models.dagrun import RUN_ID_REGEX, DagRun
from airflow.models.param import DagParam, ParamsDict
from airflow.models.taskinstance import (
Context,
TaskInstance,
TaskInstanceKey,
clear_task_instances,
)
from airflow.secrets.local_filesystem import LocalFilesystemBackend
from airflow.security import permissions
from airflow.stats import Stats
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
from airflow.timetables.simple import (
ContinuousTimetable,
DatasetTriggeredTimetable,
NullTimetable,
OnceTimetable,
)
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import check_cycle
from airflow.utils.dates import cron_presets, date_range as utils_date_range
from airflow.utils.decorators import fixup_decorator_warning_stack
from airflow.utils.helpers import at_most_one, exactly_one, validate_key
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import (
Interval,
UtcDateTime,
lock_rows,
skip_locked,
tuple_in_condition,
with_row_locks,
)
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import NOTSET, ArgNotSet, DagRunType, EdgeInfoType
if TYPE_CHECKING:
from types import ModuleType
from pendulum.tz.timezone import Timezone
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session
from airflow.datasets import Dataset
from airflow.decorators import TaskDecoratorCollection
from airflow.models.dagbag import DagBag
from airflow.models.operator import Operator
from airflow.models.slamiss import SlaMiss
from airflow.serialization.pydantic.dag import DagModelPydantic
from airflow.serialization.pydantic.dag_run import DagRunPydantic
from airflow.typing_compat import Literal
from airflow.utils.task_group import TaskGroup
log = logging.getLogger(__name__)
DEFAULT_VIEW_PRESETS = ["grid", "graph", "duration", "gantt", "landing_times"]
ORIENTATION_PRESETS = ["LR", "TB", "RL", "BT"]
TAG_MAX_LEN = 100
DagStateChangeCallback = Callable[[Context], None]
ScheduleInterval = Union[None, str, timedelta, relativedelta]
# FIXME: Ideally this should be Union[Literal[NOTSET], ScheduleInterval],
# but Mypy cannot handle that right now. Track progress of PEP 661 for progress.
# See also: https://discuss.python.org/t/9126/7
ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval]
ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, Collection["Dataset"]]
SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None]
# Backward compatibility: If neither schedule_interval nor timetable is
# *provided by the user*, default to a one-day interval.
DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1)
class InconsistentDataInterval(AirflowException):
"""Exception raised when a model populates data interval fields incorrectly.
The data interval fields should either both be None (for runs scheduled
prior to AIP-39), or both be datetime (for runs scheduled after AIP-39 is
implemented). This is raised if exactly one of the fields is None.
"""
_template = (
"Inconsistent {cls}: {start[0]}={start[1]!r}, {end[0]}={end[1]!r}, "
"they must be either both None or both datetime"
)
def __init__(self, instance: Any, start_field_name: str, end_field_name: str) -> None:
self._class_name = type(instance).__name__
self._start_field = (start_field_name, getattr(instance, start_field_name))
self._end_field = (end_field_name, getattr(instance, end_field_name))
def __str__(self) -> str:
return self._template.format(cls=self._class_name, start=self._start_field, end=self._end_field)
def _get_model_data_interval(
instance: Any,
start_field_name: str,
end_field_name: str,
) -> DataInterval | None:
start = timezone.coerce_datetime(getattr(instance, start_field_name))
end = timezone.coerce_datetime(getattr(instance, end_field_name))
if start is None:
if end is not None:
raise InconsistentDataInterval(instance, start_field_name, end_field_name)
return None
elif end is None:
raise InconsistentDataInterval(instance, start_field_name, end_field_name)
return DataInterval(start, end)
def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone) -> Timetable:
"""Create a Timetable instance from a ``schedule_interval`` argument."""
if interval is NOTSET:
return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL)
if interval is None:
return NullTimetable()
if interval == "@once":
return OnceTimetable()
if interval == "@continuous":
return ContinuousTimetable()
if isinstance(interval, (timedelta, relativedelta)):
return DeltaDataIntervalTimetable(interval)
if isinstance(interval, str):
if airflow_conf.getboolean("scheduler", "create_cron_data_intervals"):
return CronDataIntervalTimetable(interval, timezone)
else:
return CronTriggerTimetable(interval, timezone=timezone)
raise ValueError(f"{interval!r} is not a valid schedule_interval.")
def get_last_dagrun(dag_id, session, include_externally_triggered=False):
"""
Return the last dag run for a dag, None if there was none.
Last dag run can be any type of run e.g. scheduled or backfilled.
Overridden DagRuns are ignored.
"""
DR = DagRun
query = select(DR).where(DR.dag_id == dag_id)
if not include_externally_triggered:
query = query.where(DR.external_trigger == expression.false())
query = query.order_by(DR.execution_date.desc())
return session.scalar(query.limit(1))
def get_dataset_triggered_next_run_info(
dag_ids: list[str], *, session: Session
) -> dict[str, dict[str, int | str]]:
"""
Get next run info for a list of dag_ids.
Given a list of dag_ids, get string representing how close any that are dataset triggered are
their next run, e.g. "1 of 2 datasets updated".
"""
from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ, DatasetModel
return {
x.dag_id: {
"uri": x.uri,
"ready": x.ready,
"total": x.total,
}
for x in session.execute(
select(
DagScheduleDatasetReference.dag_id,
# This is a dirty hack to workaround group by requiring an aggregate,
# since grouping by dataset is not what we want to do here...but it works
case((func.count() == 1, func.max(DatasetModel.uri)), else_="").label("uri"),
func.count().label("total"),
func.sum(case((DDRQ.target_dag_id.is_not(None), 1), else_=0)).label("ready"),
)
.join(
DDRQ,
and_(
DDRQ.dataset_id == DagScheduleDatasetReference.dataset_id,
DDRQ.target_dag_id == DagScheduleDatasetReference.dag_id,
),
isouter=True,
)
.join(DatasetModel, DatasetModel.id == DagScheduleDatasetReference.dataset_id)
.group_by(DagScheduleDatasetReference.dag_id)
.where(DagScheduleDatasetReference.dag_id.in_(dag_ids))
).all()
}
def _triggerer_is_healthy():
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
job = TriggererJobRunner.most_recent_job()
return job and job.is_alive()
@functools.total_ordering
class DAG(LoggingMixin):
"""
A dag (directed acyclic graph) is a collection of tasks with directional dependencies.
A dag also has a schedule, a start date and an end date (optional). For each schedule,
(say daily or hourly), the DAG needs to run each individual tasks as their dependencies
are met. Certain tasks have the property of depending on their own past, meaning that
they can't run until their previous schedule (and upstream tasks) are completed.
DAGs essentially act as namespaces for tasks. A task_id can only be
added once to a DAG.
Note that if you plan to use time zones all the dates provided should be pendulum
dates. See :ref:`timezone_aware_dags`.
.. versionadded:: 2.4
The *schedule* argument to specify either time-based scheduling logic
(timetable), or dataset-driven triggers.
.. deprecated:: 2.4
The arguments *schedule_interval* and *timetable*. Their functionalities
are merged into the new *schedule* argument.
:param dag_id: The id of the DAG; must consist exclusively of alphanumeric
characters, dashes, dots and underscores (all ASCII)
:param description: The description for the DAG to e.g. be shown on the webserver
:param schedule: Defines the rules according to which DAG runs are scheduled. Can
accept cron string, timedelta object, Timetable, or list of Dataset objects.
If this is not provided, the DAG will be set to the default
schedule ``timedelta(days=1)``. See also :doc:`/howto/timetable`.
:param start_date: The timestamp from which the scheduler will
attempt to backfill
:param end_date: A date beyond which your DAG won't run, leave to None
for open-ended scheduling
:param template_searchpath: This list of folders (non-relative)
defines where jinja will look for your templates. Order matters.
Note that jinja/airflow includes the path of your DAG file by
default
:param template_undefined: Template undefined type.
:param user_defined_macros: a dictionary of macros that will be exposed
in your jinja templates. For example, passing ``dict(foo='bar')``
to this argument allows you to ``{{ foo }}`` in all jinja
templates related to this DAG. Note that you can pass any
type of object here.
:param user_defined_filters: a dictionary of filters that will be exposed
in your jinja templates. For example, passing
``dict(hello=lambda name: 'Hello %s' % name)`` to this argument allows
you to ``{{ 'world' | hello }}`` in all jinja templates related to
this DAG.
:param default_args: A dictionary of default parameters to be used
as constructor keyword parameters when initialising operators.
Note that operators have the same hook, and precede those defined
here, meaning that if your dict contains `'depends_on_past': True`
here and `'depends_on_past': False` in the operator's call
`default_args`, the actual value will be `False`.
:param params: a dictionary of DAG level parameters that are made
accessible in templates, namespaced under `params`. These
params can be overridden at the task level.
:param max_active_tasks: the number of task instances allowed to run
concurrently
:param max_active_runs: maximum number of active DAG runs, beyond this
number of DAG runs in a running state, the scheduler won't create
new active DAG runs
:param dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created.
:param sla_miss_callback: specify a function or list of functions to call when reporting SLA
timeouts. See :ref:`sla_miss_callback<concepts:sla_miss_callback>` for
more information about the function signature and parameters that are
passed to the callback.
:param default_view: Specify DAG default view (grid, graph, duration,
gantt, landing_times), default grid
:param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT), default LR
:param catchup: Perform scheduler catchup (or only run latest)? Defaults to True
:param on_failure_callback: A function or list of functions to be called when a DagRun of this dag fails.
A context dictionary is passed as a single parameter to this function.
:param on_success_callback: Much like the ``on_failure_callback`` except
that it is executed when the dag succeeds.
:param access_control: Specify optional DAG-level actions, e.g.,
"{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit', 'can_delete'}}"
:param is_paused_upon_creation: Specifies if the dag is paused when created for the first time.
If the dag exists already, this flag will be ignored. If this optional parameter
is not specified, the global config setting will be used.
:param jinja_environment_kwargs: additional configuration options to be passed to Jinja
``Environment`` for template rendering
**Example**: to avoid Jinja from removing a trailing newline from template strings ::
DAG(
dag_id="my-dag",
jinja_environment_kwargs={
"keep_trailing_newline": True,
# some other jinja2 Environment options here
},
)
**See**: `Jinja Environment documentation
<https://jinja.palletsprojects.com/en/2.11.x/api/#jinja2.Environment>`_
:param render_template_as_native_obj: If True, uses a Jinja ``NativeEnvironment``
to render templates as native Python types. If False, a Jinja
``Environment`` is used to render templates as string values.
:param tags: List of tags to help filtering DAGs in the UI.
:param owner_links: Dict of owners and their links, that will be clickable on the DAGs view UI.
Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link.
e.g: {"dag_owner": "https://airflow.apache.org/"}
:param auto_register: Automatically register this DAG when it is used in a ``with`` block
:param fail_stop: Fails currently running tasks when task in DAG fails.
**Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success").
An exception will be thrown if any task in a fail stop dag has a non default trigger rule.
"""
_comps = {
"dag_id",
"task_ids",
"parent_dag",
"start_date",
"end_date",
"schedule_interval",
"fileloc",
"template_searchpath",
"last_loaded",
}
__serialized_fields: frozenset[str] | None = None
fileloc: str
"""
File path that needs to be imported to load this DAG or subdag.
This may not be an actual file on disk in the case when this DAG is loaded
from a ZIP file or other DAG distribution format.
"""
parent_dag: DAG | None = None # Gets set when DAGs are loaded
# NOTE: When updating arguments here, please also keep arguments in @dag()
# below in sync. (Search for 'def dag(' in this file.)
def __init__(
self,
dag_id: str,
description: str | None = None,
schedule: ScheduleArg = NOTSET,
schedule_interval: ScheduleIntervalArg = NOTSET,
timetable: Timetable | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
full_filepath: str | None = None,
template_searchpath: str | Iterable[str] | None = None,
template_undefined: type[jinja2.StrictUndefined] = jinja2.StrictUndefined,
user_defined_macros: dict | None = None,
user_defined_filters: dict | None = None,
default_args: dict | None = None,
concurrency: int | None = None,
max_active_tasks: int = airflow_conf.getint("core", "max_active_tasks_per_dag"),
max_active_runs: int = airflow_conf.getint("core", "max_active_runs_per_dag"),
dagrun_timeout: timedelta | None = None,
sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None,
default_view: str = airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower(),
orientation: str = airflow_conf.get_mandatory_value("webserver", "dag_orientation"),
catchup: bool = airflow_conf.getboolean("scheduler", "catchup_by_default"),
on_success_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None,
on_failure_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None,
doc_md: str | None = None,
params: collections.abc.MutableMapping | None = None,
access_control: dict | None = None,
is_paused_upon_creation: bool | None = None,
jinja_environment_kwargs: dict | None = None,
render_template_as_native_obj: bool = False,
tags: list[str] | None = None,
owner_links: dict[str, str] | None = None,
auto_register: bool = True,
fail_stop: bool = False,
):
from airflow.utils.task_group import TaskGroup
if tags and any(len(tag) > TAG_MAX_LEN for tag in tags):
raise AirflowException(f"tag cannot be longer than {TAG_MAX_LEN} characters")
self.owner_links = owner_links or {}
self.user_defined_macros = user_defined_macros
self.user_defined_filters = user_defined_filters
if default_args and not isinstance(default_args, dict):
raise TypeError("default_args must be a dict")
self.default_args = copy.deepcopy(default_args or {})
params = params or {}
# merging potentially conflicting default_args['params'] into params
if "params" in self.default_args:
params.update(self.default_args["params"])
del self.default_args["params"]
# check self.params and convert them into ParamsDict
self.params = ParamsDict(params)
if full_filepath:
warnings.warn(
"Passing full_filepath to DAG() is deprecated and has no effect",
RemovedInAirflow3Warning,
stacklevel=2,
)
validate_key(dag_id)
self._dag_id = dag_id
if concurrency:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The 'concurrency' parameter is deprecated. Please use 'max_active_tasks'.",
RemovedInAirflow3Warning,
stacklevel=2,
)
max_active_tasks = concurrency
self._max_active_tasks = max_active_tasks
self._pickle_id: int | None = None
self._description = description
# set file location to caller source path
back = sys._getframe().f_back
self.fileloc = back.f_code.co_filename if back else ""
self.task_dict: dict[str, Operator] = {}
# set timezone from start_date
tz = None
if start_date and start_date.tzinfo:
tzinfo = None if start_date.tzinfo else settings.TIMEZONE
tz = pendulum.instance(start_date, tz=tzinfo).timezone
elif "start_date" in self.default_args and self.default_args["start_date"]:
date = self.default_args["start_date"]
if not isinstance(date, datetime):
date = timezone.parse(date)
self.default_args["start_date"] = date
start_date = date
tzinfo = None if date.tzinfo else settings.TIMEZONE
tz = pendulum.instance(date, tz=tzinfo).timezone
self.timezone: Timezone = tz or settings.TIMEZONE
# Apply the timezone we settled on to end_date if it wasn't supplied
if "end_date" in self.default_args and self.default_args["end_date"]:
if isinstance(self.default_args["end_date"], str):
self.default_args["end_date"] = timezone.parse(
self.default_args["end_date"], timezone=self.timezone
)
self.start_date = timezone.convert_to_utc(start_date)
self.end_date = timezone.convert_to_utc(end_date)
# also convert tasks
if "start_date" in self.default_args:
self.default_args["start_date"] = timezone.convert_to_utc(self.default_args["start_date"])
if "end_date" in self.default_args:
self.default_args["end_date"] = timezone.convert_to_utc(self.default_args["end_date"])
# sort out DAG's scheduling behavior
scheduling_args = [schedule_interval, timetable, schedule]
has_scheduling_args = any(a is not NOTSET and bool(a) for a in scheduling_args)
has_empty_start_date = not ("start_date" in self.default_args or self.start_date)
if has_scheduling_args and has_empty_start_date:
raise ValueError("DAG is missing the start_date parameter")
if not at_most_one(*scheduling_args):
raise ValueError("At most one allowed for args 'schedule_interval', 'timetable', and 'schedule'.")
if schedule_interval is not NOTSET:
warnings.warn(
"Param `schedule_interval` is deprecated and will be removed in a future release. "
"Please use `schedule` instead. ",
RemovedInAirflow3Warning,
stacklevel=2,
)
if timetable is not None:
warnings.warn(
"Param `timetable` is deprecated and will be removed in a future release. "
"Please use `schedule` instead. ",
RemovedInAirflow3Warning,
stacklevel=2,
)
self.timetable: Timetable
self.schedule_interval: ScheduleInterval
self.dataset_triggers: Collection[Dataset] = []
if isinstance(schedule, Collection) and not isinstance(schedule, str):
from airflow.datasets import Dataset
if not all(isinstance(x, Dataset) for x in schedule):
raise ValueError("All elements in 'schedule' should be datasets")
self.dataset_triggers = list(schedule)
elif isinstance(schedule, Timetable):
timetable = schedule
elif schedule is not NOTSET:
schedule_interval = schedule
if self.dataset_triggers:
self.timetable = DatasetTriggeredTimetable()
self.schedule_interval = self.timetable.summary
elif timetable:
self.timetable = timetable
self.schedule_interval = self.timetable.summary
else:
if isinstance(schedule_interval, ArgNotSet):
schedule_interval = DEFAULT_SCHEDULE_INTERVAL
self.schedule_interval = schedule_interval
self.timetable = create_timetable(schedule_interval, self.timezone)
if isinstance(template_searchpath, str):
template_searchpath = [template_searchpath]
self.template_searchpath = template_searchpath
self.template_undefined = template_undefined
self.last_loaded: datetime = timezone.utcnow()
self.safe_dag_id = dag_id.replace(".", "__dot__")
self.max_active_runs = max_active_runs
if self.timetable.active_runs_limit is not None:
if self.timetable.active_runs_limit < self.max_active_runs:
raise AirflowException(
f"Invalid max_active_runs: {type(self.timetable)} "
f"requires max_active_runs <= {self.timetable.active_runs_limit}"
)
self.dagrun_timeout = dagrun_timeout
self.sla_miss_callback = sla_miss_callback
if default_view in DEFAULT_VIEW_PRESETS:
self._default_view: str = default_view
elif default_view == "tree":
warnings.warn(
"`default_view` of 'tree' has been renamed to 'grid' -- please update your DAG",
RemovedInAirflow3Warning,
stacklevel=2,
)
self._default_view = "grid"
else:
raise AirflowException(
f"Invalid values of dag.default_view: only support "
f"{DEFAULT_VIEW_PRESETS}, but get {default_view}"
)
if orientation in ORIENTATION_PRESETS:
self.orientation = orientation
else:
raise AirflowException(
f"Invalid values of dag.orientation: only support "
f"{ORIENTATION_PRESETS}, but get {orientation}"
)
self.catchup: bool = catchup
self.partial: bool = False
self.on_success_callback = on_success_callback
self.on_failure_callback = on_failure_callback
# Keeps track of any extra edge metadata (sparse; will not contain all
# edges, so do not iterate over it for that). Outer key is upstream
# task ID, inner key is downstream task ID.
self.edge_info: dict[str, dict[str, EdgeInfoType]] = {}
# To keep it in parity with Serialized DAGs
# and identify if DAG has on_*_callback without actually storing them in Serialized JSON
self.has_on_success_callback: bool = self.on_success_callback is not None
self.has_on_failure_callback: bool = self.on_failure_callback is not None
self._access_control = DAG._upgrade_outdated_dag_access_control(access_control)
self.is_paused_upon_creation = is_paused_upon_creation
self.auto_register = auto_register
self.fail_stop: bool = fail_stop
self.jinja_environment_kwargs = jinja_environment_kwargs
self.render_template_as_native_obj = render_template_as_native_obj
self.doc_md = self.get_doc_md(doc_md)
self.tags = tags or []
self._task_group = TaskGroup.create_root(self)
self.validate_schedule_and_params()
wrong_links = dict(self.iter_invalid_owner_links())
if wrong_links:
raise AirflowException(
"Wrong link format was used for the owner. Use a valid link \n"
f"Bad formatted links are: {wrong_links}"
)
# this will only be set at serialization time
# it's only use is for determining the relative
# fileloc based only on the serialize dag
self._processor_dags_folder = None
def get_doc_md(self, doc_md: str | None) -> str | None:
if doc_md is None:
return doc_md
env = self.get_template_env(force_sandboxed=True)
if not doc_md.endswith(".md"):
template = jinja2.Template(doc_md)
else:
try:
template = env.get_template(doc_md)
except jinja2.exceptions.TemplateNotFound:
return f"""
# Templating Error!
Not able to find the template file: `{doc_md}`.
"""
return template.render()
def _check_schedule_interval_matches_timetable(self) -> bool:
"""Check ``schedule_interval`` and ``timetable`` match.
This is done as a part of the DAG validation done before it's bagged, to
guard against the DAG's ``timetable`` (or ``schedule_interval``) from
being changed after it's created, e.g.
.. code-block:: python
dag1 = DAG("d1", timetable=MyTimetable())
dag1.schedule_interval = "@once"
dag2 = DAG("d2", schedule="@once")
dag2.timetable = MyTimetable()
Validation is done by creating a timetable and check its summary matches
``schedule_interval``. The logic is not bullet-proof, especially if a
custom timetable does not provide a useful ``summary``. But this is the
best we can do.
"""
if self.schedule_interval == self.timetable.summary:
return True
try:
timetable = create_timetable(self.schedule_interval, self.timezone)
except ValueError:
return False
return timetable.summary == self.timetable.summary
def validate(self):
"""Validate the DAG has a coherent setup.
This is called by the DAG bag before bagging the DAG.
"""
if not self._check_schedule_interval_matches_timetable():
raise AirflowDagInconsistent(
f"inconsistent schedule: timetable {self.timetable.summary!r} "
f"does not match schedule_interval {self.schedule_interval!r}",
)
self.validate_schedule_and_params()
self.timetable.validate()
self.validate_setup_teardown()
def validate_setup_teardown(self):
"""
Validate that setup and teardown tasks are configured properly.
:meta private:
"""
for task in self.tasks:
if task.is_setup:
for down_task in task.downstream_list:
if not down_task.is_teardown and down_task.trigger_rule != TriggerRule.ALL_SUCCESS:
# todo: we can relax this to allow out-of-scope tasks to have other trigger rules
# this is required to ensure consistent behavior of dag
# when clearing an indirect setup
raise ValueError("Setup tasks must be followed with trigger rule ALL_SUCCESS.")
FailStopDagInvalidTriggerRule.check(dag=self, trigger_rule=task.trigger_rule)
def __repr__(self):
return f"<DAG: {self.dag_id}>"
def __eq__(self, other):
if type(self) == type(other):
# Use getattr() instead of __dict__ as __dict__ doesn't return
# correct values for properties.
return all(getattr(self, c, None) == getattr(other, c, None) for c in self._comps)
return False
def __ne__(self, other):
return not self == other
def __lt__(self, other):
return self.dag_id < other.dag_id
def __hash__(self):
hash_components = [type(self)]
for c in self._comps:
# task_ids returns a list and lists can't be hashed
if c == "task_ids":
val = tuple(self.task_dict)
else:
val = getattr(self, c, None)
try:
hash(val)
hash_components.append(val)
except TypeError:
hash_components.append(repr(val))
return hash(tuple(hash_components))
# Context Manager -----------------------------------------------
def __enter__(self):
DagContext.push_context_managed_dag(self)
return self
def __exit__(self, _type, _value, _tb):
DagContext.pop_context_managed_dag()
# /Context Manager ----------------------------------------------
@staticmethod
def _upgrade_outdated_dag_access_control(access_control=None):
"""
Look for outdated dag level actions in DAG access_controls and replace them with updated actions.
For example, in DAG access_control {'role1': {'can_dag_read'}} 'can_dag_read'
will be replaced with 'can_read', in {'role2': {'can_dag_read', 'can_dag_edit'}}
'can_dag_edit' will be replaced with 'can_edit', etc.
"""
if access_control is None:
return None
new_perm_mapping = {
permissions.DEPRECATED_ACTION_CAN_DAG_READ: permissions.ACTION_CAN_READ,
permissions.DEPRECATED_ACTION_CAN_DAG_EDIT: permissions.ACTION_CAN_EDIT,
}
updated_access_control = {}
for role, perms in access_control.items():
updated_access_control[role] = {new_perm_mapping.get(perm, perm) for perm in perms}
if access_control != updated_access_control:
warnings.warn(
"The 'can_dag_read' and 'can_dag_edit' permissions are deprecated. "
"Please use 'can_read' and 'can_edit', respectively.",
RemovedInAirflow3Warning,
stacklevel=3,
)
return updated_access_control
def date_range(
self,
start_date: pendulum.DateTime,
num: int | None = None,
end_date: datetime | None = None,
) -> list[datetime]:
message = "`DAG.date_range()` is deprecated."
if num is not None:
warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2)
with warnings.catch_warnings():
warnings.simplefilter("ignore", RemovedInAirflow3Warning)
return utils_date_range(
start_date=start_date, num=num, delta=self.normalized_schedule_interval
)
message += " Please use `DAG.iter_dagrun_infos_between(..., align=False)` instead."
warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2)
if end_date is None:
coerced_end_date = timezone.utcnow()
else:
coerced_end_date = end_date
it = self.iter_dagrun_infos_between(start_date, pendulum.instance(coerced_end_date), align=False)
return [info.logical_date for info in it]
def is_fixed_time_schedule(self):
"""Figures out if the schedule has a fixed time (e.g. 3 AM every day).
Detection is done by "peeking" the next two cron trigger time; if the
two times have the same minute and hour value, the schedule is fixed,
and we *don't* need to perform the DST fix.
This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
Do not try to understand what this actually means. It is old logic that
should not be used anywhere.
"""
warnings.warn(
"`DAG.is_fixed_time_schedule()` is deprecated.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
from airflow.timetables._cron import CronMixin
if not isinstance(self.timetable, CronMixin):
return True
from croniter import croniter
cron = croniter(self.timetable._expression)
next_a = cron.get_next(datetime)
next_b = cron.get_next(datetime)
return next_b.minute == next_a.minute and next_b.hour == next_a.hour
def following_schedule(self, dttm):
"""
Calculate the following schedule for this dag in UTC.
:param dttm: utc datetime
:return: utc datetime
"""
warnings.warn(
"`DAG.following_schedule()` is deprecated. Use `DAG.next_dagrun_info(restricted=False)` instead.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
data_interval = self.infer_automated_data_interval(timezone.coerce_datetime(dttm))
next_info = self.next_dagrun_info(data_interval, restricted=False)
if next_info is None:
return None
return next_info.data_interval.start
def previous_schedule(self, dttm):
from airflow.timetables.interval import _DataIntervalTimetable
warnings.warn(
"`DAG.previous_schedule()` is deprecated.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
if not isinstance(self.timetable, _DataIntervalTimetable):
return None
return self.timetable._get_prev(timezone.coerce_datetime(dttm))
def get_next_data_interval(self, dag_model: DagModel) -> DataInterval | None:
"""Get the data interval of the next scheduled run.
For compatibility, this method infers the data interval from the DAG's
schedule if the run does not have an explicit one set, which is possible
for runs created prior to AIP-39.
This function is private to Airflow core and should not be depended on as a
part of the Python API.
:meta private:
"""
if self.dag_id != dag_model.dag_id:
raise ValueError(f"Arguments refer to different DAGs: {self.dag_id} != {dag_model.dag_id}")
if dag_model.next_dagrun is None: # Next run not scheduled.
return None
data_interval = dag_model.next_dagrun_data_interval
if data_interval is not None:
return data_interval
# Compatibility: A run was scheduled without an explicit data interval.
# This means the run was scheduled before AIP-39 implementation. Try to
# infer from the logical date.
return self.infer_automated_data_interval(dag_model.next_dagrun)
def get_run_data_interval(self, run: DagRun | DagRunPydantic) -> DataInterval:
"""Get the data interval of this run.
For compatibility, this method infers the data interval from the DAG's
schedule if the run does not have an explicit one set, which is possible for
runs created prior to AIP-39.
This function is private to Airflow core and should not be depended on as a
part of the Python API.
:meta private:
"""
if run.dag_id is not None and run.dag_id != self.dag_id:
raise ValueError(f"Arguments refer to different DAGs: {self.dag_id} != {run.dag_id}")
data_interval = _get_model_data_interval(run, "data_interval_start", "data_interval_end")
if data_interval is not None:
return data_interval
# Compatibility: runs created before AIP-39 implementation don't have an
# explicit data interval. Try to infer from the logical date.
return self.infer_automated_data_interval(run.execution_date)
def infer_automated_data_interval(self, logical_date: datetime) -> DataInterval:
"""Infer a data interval for a run against this DAG.
This method is used to bridge runs created prior to AIP-39
implementation, which do not have an explicit data interval. Therefore,
this method only considers ``schedule_interval`` values valid prior to
Airflow 2.2.
DO NOT call this method if there is a known data interval.
:meta private:
"""
timetable_type = type(self.timetable)
if issubclass(timetable_type, (NullTimetable, OnceTimetable, DatasetTriggeredTimetable)):
return DataInterval.exact(timezone.coerce_datetime(logical_date))
start = timezone.coerce_datetime(logical_date)
if issubclass(timetable_type, CronDataIntervalTimetable):
end = cast(CronDataIntervalTimetable, self.timetable)._get_next(start)
elif issubclass(timetable_type, DeltaDataIntervalTimetable):
end = cast(DeltaDataIntervalTimetable, self.timetable)._get_next(start)
# Contributors: When the exception below is raised, you might want to
# add an 'elif' block here to handle custom timetables. Stop! The bug
# you're looking for is instead at when the DAG run (represented by
# logical_date) was created. See GH-31969 for an example:
# * Wrong fix: GH-32074 (modifies this function).
# * Correct fix: GH-32118 (modifies the DAG run creation code).
else:
raise ValueError(f"Not a valid timetable: {self.timetable!r}")
return DataInterval(start, end)
def next_dagrun_info(
self,
last_automated_dagrun: None | datetime | DataInterval,
*,
restricted: bool = True,
) -> DagRunInfo | None:
"""Get information about the next DagRun of this dag after ``date_last_automated_dagrun``.
This calculates what time interval the next DagRun should operate on
(its execution date) and when it can be scheduled, according to the