Skip to content

Conversation

@xinbinhuang
Copy link
Contributor

@xinbinhuang xinbinhuang commented Jun 12, 2020

#8078 cc: @dimberman @kaxil

Hi guys,

Sorry for the long wait. It took me a while to figure out where to properly put the changes and I am also occupied by other things. Anyway, here is a draft PR, for the proposed rewrite of the SubDagOperator. I will write out more details on the AIP tonight. But here is the gist.

There are 3 main changes:

  1. Rewrite DagBag.bag_dag logic to attach tasks of the subdags to the root dag during dag file parsing.

Here is the overall process:

  1. it loops over the root dag's tasks
  2. When it meets a SubDagOperator, it unpacks the SubDag, and reattach all the tasks to the root dag. This unpacking process will traverse down to the SubDag's tasks.

Here is the detailed unpacking process:

  1. Remove the SubDagOperator node from the root_dag
  2. loop over SubDag's task to re-attach them to the root_dag, and add current_group and parent_group metadata (see item 3 below).
  3. Remove SubDagOperator task_id from upstream_tasks and add subdag.roots to the upstream tasks; similarily for downstream_tasks
  4. Traverse down to the subdag,. It will do nothing if there is no nested subdags, and otherwise keep unpacking.
  1. Rewrite the SubDagOperator
    • I remove most of the methods as they are not relevant anymore.
    • Change the function signature to require a dag_factory function, and it will automatically pass the task_id as the first positional argument of the dag_factory function. It's cleaner because we don't need to match up the SubDagOperator.task_id with the subdag.dag_id manually.
  2. Add a TaskGroup model (current_group/current_dag and parent_group/parent_dag) that is used for grouping related tasks. This is used to replace the concept of SubDag to group tasks together and render the visual grouping effect in the UI.
    • When I was creating this, I feel like it does not need to be used for SubDag specifically, and we can actually group arbitrary tasks to allow generic metadata operation (e.g. monitor TIs of a task group, stop/retry a group of task etc)

Here is a example subdag dag I used to test out the new SubDag Operator. It can successfully unpack the subdags.

This is the comparison on the generated graph:

Old:
official_subdag_example

New:
unpacked_subdag_dag

Also, it seems that there are a lot of SubDag related handling logic in the codebase, and it means that if we decide to go forward with the rewrite. We probably will need to clean up a lot of code. I believe it's a good thing?

grep -r subdag airflow | wc -l
162

Here are the things that I think need to be done:

  • Update the UI to review this change
  • Avoid task conflicts between root_dag and sub_dag. I am thinking to automatically concatenate the current_group=subdag.dag_id=subdag_operator.task_id, to the task_ids.
  • DB Migration
  • Clean up other subdag related logics from the codebase

I have not created any tests yet for it as I want to have some discussion first. Let me know your thought :)

Cheers,
Bin


Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@xinbinhuang xinbinhuang changed the title [AIP-34] Refactor SubDagOperator [AIP-34] Rewrite SubDagOperator Jun 12, 2020
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With Serialized dag storing everything already I'm not sure if this needs it's own model or not.

Copy link
Contributor Author

@xinbinhuang xinbinhuang Jun 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ash,

I am not familiar with the code on Serialized dag. Can you point me to the code that you are referring to? I will take a look at it

@stale
Copy link

stale bot commented Jul 29, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jul 29, 2020
@dimberman dimberman added the pinned Protect from Stalebot auto closing label Jul 31, 2020
@kaxil kaxil removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Aug 3, 2020
@kaxil
Copy link
Member

kaxil commented Aug 3, 2020

Hi @xinbinhuang Can you rebase the PR, and based on our last discussion in Slack (last month -- or the month before), I think it needs an update

Add subdag tasks to root dag
- It's originally proposed to allow running SubDag tasks as part of parent Dag but still keep the visual grouping effect in Graph/Tree view see issue 8078.
- This can be further extend to allow arbitrary grouping of tasks that make other metadata operations possible.
@kaxil
Copy link
Member

kaxil commented Aug 6, 2020

tests are failing @xinbinhuang

@xinbinhuang xinbinhuang closed this Sep 1, 2020
@xinbinhuang
Copy link
Contributor Author

In favor of #10153

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pinned Protect from Stalebot auto closing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants