From b9dd118d0ec4fd71fb01335062a76efc803b92e4 Mon Sep 17 00:00:00 2001 From: Xinbin Huang Date: Tue, 9 Jun 2020 21:17:50 -0700 Subject: [PATCH 1/8] Change process subdag file logic Add subdag tasks to root dag --- airflow/models/dagbag.py | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 89afbdaf78a68..f9684ed38b175 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -110,10 +110,10 @@ def __init__( # Only used by read_dags_from_db=True self.dags_last_fetched: Dict[str, datetime] = {} - self.collect_dags( - dag_folder=dag_folder, - include_examples=include_examples, - safe_mode=safe_mode) + # self.collect_dags( + # dag_folder=dag_folder, + # include_examples=include_examples, + # safe_mode=safe_mode) def size(self) -> int: """ @@ -363,14 +363,29 @@ def bag_dag(self, dag, root_dag): for task in dag.tasks: settings.policy(task) - subdags = dag.subdags + # subdags = dag.subdags try: - for subdag in subdags: - subdag.full_filepath = dag.full_filepath - subdag.parent_dag = dag - subdag.is_subdag = True - self.bag_dag(dag=subdag, root_dag=root_dag) + from airflow.operators.subdag_operator import SubDagOperator + from airflow.models.baseoperator import cross_downstream + for task in dag.tasks: + if not isinstance(task, SubDagOperator): + continue + else: + subdag = task.subdag + + upstream_tasks = task.upstream_list + downstream_tasks = task.downstream_list + + cross_downstream(from_tasks=upstream_tasks, to_tasks=subdag.roots) + cross_downstream(from_tasks=subdag.leaves, to_tasks=downstream_tasks) + + for subdag_task in subdag.tasks: + subdag_task.dag_id = root_dag.dag_id + subdag_task.parent_group = parent_dag.dag_id + subdag_task.current_group = task.task_id # dag.dag_id + + self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag) self.dags[dag.dag_id] = dag self.log.debug('Loaded DAG %s', dag) From 8ca7dd65af2b73324c970f7f791f345f1bb682f4 Mon Sep 17 00:00:00 2001 From: Xinbin Huang Date: Wed, 10 Jun 2020 16:24:04 -0700 Subject: [PATCH 2/8] fixup! Change process subdag file logic --- airflow/models/baseoperator.py | 7 +++++ airflow/models/dagbag.py | 47 +++++++++++++++++----------------- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 908bec26ba136..38f577d1a9acc 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1112,6 +1112,13 @@ def dry_run(self) -> None: self.log.info('Rendering template for %s', field) self.log.info(content) + def _remove_direct_relative_id(self, task_id: str, upstream: bool = False) -> None: + """Remove a task id from the direct relative upstream/downstream task ids""" + if upstream: + self._upstream_task_ids.remove(task_id) + else: + self._downstream_task_ids.remove(task_id) + def get_direct_relative_ids(self, upstream: bool = False) -> Set[str]: """ Get set of the direct relative ids to the current task, upstream or diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index f9684ed38b175..4f94b45d5cb8a 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -110,10 +110,10 @@ def __init__( # Only used by read_dags_from_db=True self.dags_last_fetched: Dict[str, datetime] = {} - # self.collect_dags( - # dag_folder=dag_folder, - # include_examples=include_examples, - # safe_mode=safe_mode) + self.collect_dags( + dag_folder=dag_folder, + include_examples=include_examples, + safe_mode=safe_mode) def size(self) -> int: """ @@ -355,49 +355,48 @@ def bag_dag(self, dag, root_dag): Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags """ - test_cycle(dag) # throws if a task cycle is found - dag.resolve_template_files() dag.last_loaded = timezone.utcnow() for task in dag.tasks: settings.policy(task) - # subdags = dag.subdags - try: from airflow.operators.subdag_operator import SubDagOperator from airflow.models.baseoperator import cross_downstream - for task in dag.tasks: + for task_id, task in dag.task_dict.copy().items(): if not isinstance(task, SubDagOperator): continue else: subdag = task.subdag + + for subdag_task in subdag.tasks: + del subdag_task._dag + root_dag.add_task(subdag_task) + subdag_task.parent_group = parent_dag.dag_id + subdag_task.current_group = dag.dag_id + upstream_tasks = task.upstream_list + for upstream_task in upstream_tasks: + upstream_task._remove_direct_relative_id(task_id, upstream=False) + downstream_tasks = task.downstream_list + for downstream_task in downstream_tasks: + downstream_task._remove_direct_relative_id(task_id, upstream=True) cross_downstream(from_tasks=upstream_tasks, to_tasks=subdag.roots) cross_downstream(from_tasks=subdag.leaves, to_tasks=downstream_tasks) - - for subdag_task in subdag.tasks: - subdag_task.dag_id = root_dag.dag_id - subdag_task.parent_group = parent_dag.dag_id - subdag_task.current_group = task.task_id # dag.dag_id - + + del dag.task_dict[task_id] self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag) - self.dags[dag.dag_id] = dag - self.log.debug('Loaded DAG %s', dag) + if dag is root_dag: + test_cycle(root_dag) + self.dags[dag.dag_id] = dag + self.log.debug('Loaded DAG %s', dag) except AirflowDagCycleException as cycle_exception: - # There was an error in bagging the dag. Remove it from the list of dags self.log.exception('Exception bagging dag: %s', dag.dag_id) - # Only necessary at the root level since DAG.subdags automatically - # performs DFS to search through all subdags - if dag == root_dag: - for subdag in subdags: - if subdag.dag_id in self.dags: - del self.dags[subdag.dag_id] raise cycle_exception def collect_dags( From e401ed8871cea2d931e2a57846c712b58b439baf Mon Sep 17 00:00:00 2001 From: Xinbin Huang Date: Thu, 11 Jun 2020 15:32:37 -0700 Subject: [PATCH 3/8] Finalize subdag parsing logic --- airflow/models/dagbag.py | 56 +++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 4f94b45d5cb8a..7421ee9e1604a 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -361,36 +361,34 @@ def bag_dag(self, dag, root_dag): for task in dag.tasks: settings.policy(task) - try: - from airflow.operators.subdag_operator import SubDagOperator - from airflow.models.baseoperator import cross_downstream - for task_id, task in dag.task_dict.copy().items(): - if not isinstance(task, SubDagOperator): - continue - else: - subdag = task.subdag - - for subdag_task in subdag.tasks: - del subdag_task._dag - root_dag.add_task(subdag_task) - subdag_task.parent_group = parent_dag.dag_id - subdag_task.current_group = dag.dag_id - - - upstream_tasks = task.upstream_list - for upstream_task in upstream_tasks: - upstream_task._remove_direct_relative_id(task_id, upstream=False) - - downstream_tasks = task.downstream_list - for downstream_task in downstream_tasks: - downstream_task._remove_direct_relative_id(task_id, upstream=True) - - cross_downstream(from_tasks=upstream_tasks, to_tasks=subdag.roots) - cross_downstream(from_tasks=subdag.leaves, to_tasks=downstream_tasks) - - del dag.task_dict[task_id] - self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag) + from airflow.operators.subdag_operator import SubDagOperator + from airflow.models.baseoperator import cross_downstream + for task_id, task in dag.task_dict.copy().items(): + if not isinstance(task, SubDagOperator): + continue + else: + del root_dag.task_dict[task_id] + + subdag = task.subdag + for subdag_task in subdag.tasks: + del subdag_task._dag + root_dag.add_task(subdag_task) + subdag_task.parent_group = parent_dag.dag_id + subdag_task.current_group = dag.dag_id + + upstream_tasks = task.upstream_list + for upstream_task in upstream_tasks: + upstream_task._remove_direct_relative_id(task_id, upstream=False) + cross_downstream(from_tasks=upstream_tasks, to_tasks=subdag.roots) + + downstream_tasks = task.downstream_list + for downstream_task in downstream_tasks: + downstream_task._remove_direct_relative_id(task_id, upstream=True) + cross_downstream(from_tasks=subdag.leaves, to_tasks=downstream_tasks) + + self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag) + try: if dag is root_dag: test_cycle(root_dag) self.dags[dag.dag_id] = dag From e82eb6bfb66f6371d67a0a9e7fa57530355d23e4 Mon Sep 17 00:00:00 2001 From: Xinbin Huang Date: Thu, 11 Jun 2020 15:37:54 -0700 Subject: [PATCH 4/8] fixup! Finalize subdag parsing logic --- airflow/models/dagbag.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 7421ee9e1604a..84d9981505d69 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -355,15 +355,15 @@ def bag_dag(self, dag, root_dag): Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags """ - dag.resolve_template_files() - dag.last_loaded = timezone.utcnow() + root_dag.resolve_template_files() + root_dag.last_loaded = timezone.utcnow() for task in dag.tasks: settings.policy(task) from airflow.operators.subdag_operator import SubDagOperator from airflow.models.baseoperator import cross_downstream - for task_id, task in dag.task_dict.copy().items(): + for task_id, task in root_dag.task_dict.copy().items(): if not isinstance(task, SubDagOperator): continue else: @@ -375,7 +375,7 @@ def bag_dag(self, dag, root_dag): root_dag.add_task(subdag_task) subdag_task.parent_group = parent_dag.dag_id subdag_task.current_group = dag.dag_id - + upstream_tasks = task.upstream_list for upstream_task in upstream_tasks: upstream_task._remove_direct_relative_id(task_id, upstream=False) @@ -385,7 +385,7 @@ def bag_dag(self, dag, root_dag): for downstream_task in downstream_tasks: downstream_task._remove_direct_relative_id(task_id, upstream=True) cross_downstream(from_tasks=subdag.leaves, to_tasks=downstream_tasks) - + self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag) try: From e23c5227977c531c19201653c9f3bf985746e59c Mon Sep 17 00:00:00 2001 From: Xinbin Huang Date: Thu, 11 Jun 2020 15:52:25 -0700 Subject: [PATCH 5/8] fixup! fixup! Finalize subdag parsing logic --- airflow/models/dagbag.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 84d9981505d69..32749761809c6 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -355,15 +355,14 @@ def bag_dag(self, dag, root_dag): Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags """ - root_dag.resolve_template_files() - root_dag.last_loaded = timezone.utcnow() + dag.resolve_template_files() for task in dag.tasks: settings.policy(task) from airflow.operators.subdag_operator import SubDagOperator from airflow.models.baseoperator import cross_downstream - for task_id, task in root_dag.task_dict.copy().items(): + for task_id, task in dag.task_dict.copy().items(): if not isinstance(task, SubDagOperator): continue else: @@ -390,7 +389,8 @@ def bag_dag(self, dag, root_dag): try: if dag is root_dag: - test_cycle(root_dag) + dag.last_loaded = timezone.utcnow() + test_cycle(dag) self.dags[dag.dag_id] = dag self.log.debug('Loaded DAG %s', dag) except AirflowDagCycleException as cycle_exception: From e0d158fe9ff7e519114c3b497ab2dc6c788444ed Mon Sep 17 00:00:00 2001 From: Xinbin Huang Date: Thu, 11 Jun 2020 16:35:58 -0700 Subject: [PATCH 6/8] fixup! fixup! fixup! Finalize subdag parsing logic --- airflow/models/dagbag.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 32749761809c6..aa2f22a10a69f 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -372,8 +372,8 @@ def bag_dag(self, dag, root_dag): for subdag_task in subdag.tasks: del subdag_task._dag root_dag.add_task(subdag_task) - subdag_task.parent_group = parent_dag.dag_id - subdag_task.current_group = dag.dag_id + subdag_task.parent_group = dag.dag_id + subdag_task.current_group = subdag.dag_id upstream_tasks = task.upstream_list for upstream_task in upstream_tasks: From 18c60b529ce2f4734703a10e798f2f4dcde6b431 Mon Sep 17 00:00:00 2001 From: Xinbin Huang Date: Thu, 11 Jun 2020 17:04:12 -0700 Subject: [PATCH 7/8] Rewrite SubDagOperator --- airflow/models/baseoperator.py | 2 +- airflow/operators/subdag_operator.py | 212 +++++---------------------- 2 files changed, 35 insertions(+), 179 deletions(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 38f577d1a9acc..9537b243c6fa1 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -456,7 +456,7 @@ def __init__( # subdag parameter is only set for SubDagOperator. # Setting it to None by default as other Operators do not have that field from airflow.models.dag import DAG - self.subdag: Optional[DAG] = None + self._subdag: Optional[DAG] = None self._log = logging.getLogger("airflow.task.operators") diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py index 660687069ac08..50efaba6ba254 100644 --- a/airflow/operators/subdag_operator.py +++ b/airflow/operators/subdag_operator.py @@ -18,197 +18,53 @@ """ The module which provides a way to nest your DAGs and so your levels of complexity. """ -from enum import Enum -from typing import Optional +from typing import Callable, Optional +from cached_property import cached_property -from sqlalchemy.orm.session import Session - -from airflow.api.common.experimental.get_task_instance import get_task_instance -from airflow.exceptions import AirflowException, TaskInstanceNotFound -from airflow.models import DagRun -from airflow.models.dag import DAG, DagContext -from airflow.models.pool import Pool -from airflow.models.taskinstance import TaskInstance -from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.models.dag import DAG +from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults -from airflow.utils.session import create_session, provide_session -from airflow.utils.state import State -from airflow.utils.types import DagRunType -class SkippedStatePropagationOptions(Enum): - """ - Available options for skipped state propagation of subdag's tasks to parent dag tasks. +class SubDagOperator(BaseOperator): """ - ALL_LEAVES = 'all_leaves' - ANY_LEAF = 'any_leaf' + This creates a SubDag. A SubDag's tasks will be recursively unpacked and append + to the root DAG during parsing. + The factory function should satisfy the following signature. -class SubDagOperator(BaseSensorOperator): - """ - This runs a sub dag. By convention, a sub dag's dag_id - should be prefixed by its parent and a dot. As in `parent.child`. + def dag_factory(dag_id, ...): + dag = DAG( + dag_id=dag_id, + ... + ) - Although SubDagOperator can occupy a pool/concurrency slot, - user can specify the mode=reschedule so that the slot will be - released periodically to avoid potential deadlock. + The first positional argument must be a dag_id passing to the DAG constructor. Internally, + it will be passed with the operator.task_id to create metadata to render grouping in the UI. - :param subdag: the DAG object to run as a subdag of the current DAG. - :param session: sqlalchemy session - :param propagate_skipped_state: by setting this argument you can define - whether the skipped state of leaf task(s) should be propagated to the parent dag's downstream task. + :param subdag_factory: a DAG factory function that returns a dag when called + :param subdag_args: a list of positional arguments that will get unpacked when + calling the factory function + :param subdag_kwargs: a dictionary of keyword arguments that will get unpacked + in the factory function """ ui_color = '#555' ui_fgcolor = '#fff' - @provide_session @apply_defaults def __init__(self, - *, - subdag: DAG, - session: Optional[Session] = None, - propagate_skipped_state: Optional[SkippedStatePropagationOptions] = None, - **kwargs) -> None: - super().__init__(**kwargs) - self.subdag = subdag - self.propagate_skipped_state = propagate_skipped_state - - self._validate_dag(kwargs) - self._validate_pool(session) - - def _validate_dag(self, kwargs): - dag = kwargs.get('dag') or DagContext.get_current_dag() - - if not dag: - raise AirflowException('Please pass in the `dag` param or call within a DAG context manager') - - if dag.dag_id + '.' + kwargs['task_id'] != self.subdag.dag_id: - raise AirflowException( - "The subdag's dag_id should have the form '{{parent_dag_id}}.{{this_task_id}}'. " - "Expected '{d}.{t}'; received '{rcvd}'.".format( - d=dag.dag_id, t=kwargs['task_id'], rcvd=self.subdag.dag_id) - ) - - def _validate_pool(self, session): - if self.pool: - conflicts = [t for t in self.subdag.tasks if t.pool == self.pool] - if conflicts: - # only query for pool conflicts if one may exist - pool = (session - .query(Pool) - .filter(Pool.slots == 1) - .filter(Pool.pool == self.pool) - .first()) - if pool and any(t.pool == self.pool for t in self.subdag.tasks): - raise AirflowException( - 'SubDagOperator {sd} and subdag task{plural} {t} both ' - 'use pool {p}, but the pool only has 1 slot. The ' - 'subdag tasks will never run.'.format( - sd=self.task_id, - plural=len(conflicts) > 1, - t=', '.join(t.task_id for t in conflicts), - p=self.pool - ) - ) - - def _get_dagrun(self, execution_date): - dag_runs = DagRun.find( - dag_id=self.subdag.dag_id, - execution_date=execution_date, - ) - return dag_runs[0] if dag_runs else None - - def _reset_dag_run_and_task_instances(self, dag_run, execution_date): - """ - Set the DagRun state to RUNNING and set the failed TaskInstances to None state - for scheduler to pick up. - - :param dag_run: DAG run - :param execution_date: Execution date - :return: None - """ - with create_session() as session: - dag_run.state = State.RUNNING - session.merge(dag_run) - failed_task_instances = (session - .query(TaskInstance) - .filter(TaskInstance.dag_id == self.subdag.dag_id) - .filter(TaskInstance.execution_date == execution_date) - .filter(TaskInstance.state.in_([State.FAILED, State.UPSTREAM_FAILED]))) - - for task_instance in failed_task_instances: - task_instance.state = State.NONE - session.merge(task_instance) - session.commit() - - def pre_execute(self, context): - execution_date = context['execution_date'] - dag_run = self._get_dagrun(execution_date) - - if dag_run is None: - dag_run = self.subdag.create_dagrun( - run_type=DagRunType.SCHEDULED, - execution_date=execution_date, - state=State.RUNNING, - external_trigger=True, - ) - self.log.info("Created DagRun: %s", dag_run.run_id) - else: - self.log.info("Found existing DagRun: %s", dag_run.run_id) - if dag_run.state == State.FAILED: - self._reset_dag_run_and_task_instances(dag_run, execution_date) - - def poke(self, context): - execution_date = context['execution_date'] - dag_run = self._get_dagrun(execution_date=execution_date) - return dag_run.state != State.RUNNING - - def post_execute(self, context, result=None): - execution_date = context['execution_date'] - dag_run = self._get_dagrun(execution_date=execution_date) - self.log.info("Execution finished. State is %s", dag_run.state) - - if dag_run.state != State.SUCCESS: - raise AirflowException( - "Expected state: SUCCESS. Actual state: {}".format(dag_run.state) - ) - - if self.propagate_skipped_state and self._check_skipped_states(context): - self._skip_downstream_tasks(context) - - def _check_skipped_states(self, context): - leaves_tis = self._get_leaves_tis(context['execution_date']) - - if self.propagate_skipped_state == SkippedStatePropagationOptions.ANY_LEAF: - return any(ti.state == State.SKIPPED for ti in leaves_tis) - if self.propagate_skipped_state == SkippedStatePropagationOptions.ALL_LEAVES: - return all(ti.state == State.SKIPPED for ti in leaves_tis) - raise AirflowException( - 'Unimplemented SkippedStatePropagationOptions {} used.'.format(self.propagate_skipped_state)) - - def _get_leaves_tis(self, execution_date): - leaves_tis = [] - for leaf in self.subdag.leaves: - try: - ti = get_task_instance( - dag_id=self.subdag.dag_id, - task_id=leaf.task_id, - execution_date=execution_date - ) - leaves_tis.append(ti) - except TaskInstanceNotFound: - continue - return leaves_tis - - def _skip_downstream_tasks(self, context): - self.log.info('Skipping downstream tasks because propagate_skipped_state is set to %s ' - 'and skipped task(s) were found.', self.propagate_skipped_state) - - downstream_tasks = context['task'].downstream_list - self.log.debug('Downstream task_ids %s', downstream_tasks) - - if downstream_tasks: - self.skip(context['dag_run'], context['execution_date'], downstream_tasks) - - self.log.info('Done.') + subdag_factory: Callable[..., DAG], + subdag_args: Optional[list] = None, + subdag_kwargs: Optional[dict] = None, + *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.subdag_args = subdag_args or [] + self.subdag_kwargs = subdag_kwargs or {} + self.subdag_factory = subdag_factory + + @cached_property + def subdag(self) -> DAG: + """The SubDag carried by the operator""" + self._subdag = self.subdag_factory(self.task_id, *self.subdag_args, **self.subdag_kwargs) + return self._subdag From 410053d77fcb060238c71a6e653cc83b5108d4f5 Mon Sep 17 00:00:00 2001 From: Xinbin Huang Date: Thu, 11 Jun 2020 17:54:34 -0700 Subject: [PATCH 8/8] Add TaskGroup model - It's originally proposed to allow running SubDag tasks as part of parent Dag but still keep the visual grouping effect in Graph/Tree view see issue 8078. - This can be further extend to allow arbitrary grouping of tasks that make other metadata operations possible. --- airflow/models/baseoperator.py | 10 ++++++++++ airflow/models/task_group.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 airflow/models/task_group.py diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 9537b243c6fa1..4f105ab86998b 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -264,6 +264,10 @@ class derived from this one results in the creation of a task object, :param do_xcom_push: if True, an XCom is pushed containing the Operator's result :type do_xcom_push: bool + :param current_group: the current group of the task + :type current_group: str + :param parent_group: the parent group of the task + :type parent_group: str """ # For derived classes to define which fields will get jinjaified template_fields: Iterable[str] = () @@ -359,6 +363,9 @@ def __init__( do_xcom_push: bool = True, inlets: Optional[Any] = None, outlets: Optional[Any] = None, + current_group: Optional[str] = None, + parent_group: Optional[str] = None, + *args, **kwargs ): from airflow.models.dag import DagContext @@ -473,6 +480,9 @@ def __init__( if outlets: self._outlets = outlets if isinstance(outlets, list) else [outlets, ] + self.current_group = current_group + self.parent_group = parent_group + def __eq__(self, other): if type(self) is type(other) and self.task_id == other.task_id: return all(self.__dict__.get(c, None) == other.__dict__.get(c, None) for c in self._comps) diff --git a/airflow/models/task_group.py b/airflow/models/task_group.py new file mode 100644 index 0000000000000..d57c41f82d977 --- /dev/null +++ b/airflow/models/task_group.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from sqlalchemy import Column, ForeignKey, String + +from airflow.models.base import COLLATION_ARGS, ID_LEN, Base + + +class TaskGroup(Base): + """ + A task group per dag per task; grouping is rendered in the Graph/Tree view. + """ + __tablename__ = "task_group" + dag_id = Column(String(ID_LEN, **COLLATION_ARGS), ForeignKey('dag.dag_id'), primary_key=True) + task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True) + current_group = Column(String(ID_LEN)) + parent_group = Column(String(ID_LEN))