Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added taskgroup decorator #13479

Closed
wants to merge 7 commits into from

Conversation

VBhojawala
Copy link
Contributor

Added taskgroup decorator which can be used to create taskgroup from python callable. Inside python callable tasks can be grouped by calling tasks (python callable ) created with task decorator.

  • taskgroup decorator takes both optional and keyword argument which are passed to Constructor of TaskGroup class.

  • TaskGroup can be created with one of following syntax

@taskgroup
@taskgroup()
@taskgroup(group_id='group_name')
  • TaskGroup class constructor takes one mandatory argument group_id, if not given in decorator it sets group_id to python callable name.

Following is a simple example demonstrating use of taskgroup decorator grouping multiple tasks.

@task
def task_1(value):
  return f'[ Task1 {value} ]'


@task
def task_2(value):
  print(f'[ Task2 {value} ]')


@taskgroup
def section_1(value):
  return task_2(task_1(value))

taskgroup decorator utilizes existing TaskGroup context manager currently used for creating TaskGroup, which means we can create nested taskgroup by created nested callable. Following is an example demonstrating use of nested taskgroup.

@task
def task_start():
    return '[Task_start]'

@task
def task_end():
    print(f'[ Task_End ]')

@task
def task_1(value):
    return f'[ Task1 {value} ]'

@task
def task_2(value):
    print(f'[ Task2 {value} ]')

@task
def task_3(value):
    return f'[ Task3 {value} ]'

@task
def task_4(value):
    print(f'[ Task4 {value} ]')

@taskgroup
def section_1(value):

    @taskgroup
    def section_2(value2):
        return task_4(task_3(value2))

    op1 = task_2(task_1(value))
    return section_2(op1) 

Dedicated test cases for taskgroup decorator is created in file
/tests/utils/test_task_group_decorator.py

Recent changes

  • Added logic to append suffix to duplicate group_id.

closes: #11870

@VBhojawala
Copy link
Contributor Author

VBhojawala commented Jan 18, 2021

Hi @casassg @yuqian90 @ashb,

I have raised new pull request with suggested changes. Kindly review it.

airflow/utils/task_group.py Outdated Show resolved Hide resolved
@VBhojawala VBhojawala requested a review from ashb January 19, 2021 14:16
Copy link
Contributor

@casassg casassg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks great! Thanks for taking your time to contribute this!

a small nit: Could you add an extra test that calls a taskgroup decorated function 2 times.

@taskgroup
def training(paths):
  validate(train(paths))

@dag
def pipeline():
  training('some/path')
  training('some/other/path')

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

Copy link
Contributor

@casassg casassg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ty! This looks good to me. Will need a committer for a final review

@kaxil
Copy link
Member

kaxil commented Feb 11, 2021

Can you rebase the PR on latest master please

@kaxil kaxil requested a review from turbaszek February 11, 2021 23:50
@VBhojawala
Copy link
Contributor Author

Hi @kaxil, I have rebased the PR.

airflow/utils/task_group.py Outdated Show resolved Hide resolved
@VBhojawala VBhojawala requested a review from kaxil February 13, 2021 06:19
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Feb 13, 2021
@github-actions
Copy link

The Workflow run is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason.

@github-actions
Copy link

The Workflow run is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason.

@ashb ashb added this to the Airflow 2.1 milestone Feb 17, 2021
airflow/utils/task_group.py Outdated Show resolved Hide resolved
Comment on lines +834 to +837
with DAG(dag_id="example_duplicate_task_group_id", start_date=execution_date, tags=["example"]) as dag:
task_group1()
task_group2()
task_group3()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is the right behavour!

This is separate task groups with the same id. That feels like a logic bug on the DAG author part and should throw an error.

What do others think of this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. But we can't retrieve the function for previously called decorated functions, which means that we can't know wether the function name is the same or not (aka wether its the same function being called 2 times or 2 functions w the same group_id). So while I agree this is an unintended side effect, I would leave it as is as it allows us to instatiate the same function several times

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't check what task group decorator created the previous function, but we could check if the current TG was created by the current decorated function, and if not then it would fail.

We could keep a set of TG objects in the wrapper fn, and if the TG already exists on the DAG, and the TG is not in that set, then it was created by some other decorator -> raise an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I don't think it's a big deal either way. If users do this, even though it's weird and can be confusing, they can do it.

Mostly want to make sure we are valuing the right trade offs between complexity added and value of having this extra check.

Not strongly opinionated though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was that this is more likely a bug than desired behaviour -- for instance someone copy-and-pasted a task group but forgot to update the name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @casassg here. It would be very hard to enforce this. I'm especially imagining situations where users import taskgroups from libraries and therefore do not have control over the task_group_id. That said, it would make sense for us to allow users to override the task_group_id via a kwarg in case they import two functions with the same id name.

Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
@github-actions github-actions bot removed the full tests needed We need to run full set of tests for this PR to merge label Feb 17, 2021
@VBhojawala VBhojawala requested a review from ashb February 24, 2021 07:07


# Creating TaskGroups
@taskgroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be nicer to call it "@task_group" instead of "@TaskGroup" since there's no capitalization

@taskgroup
def section_1(value):
""" TaskGroup for grouping related Tasks"""
return task_3(task_2(task_1(value)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you unwind this? I wound't want to encourage people to nest like this.

@dimberman
Copy link
Contributor

Hi @VBhojawala are you interested in continuing this PR? I'm glad to either finish it for you or help you finish :)

@ashb
Copy link
Member

ashb commented Mar 15, 2021

Hi @dimberman ,

Kindly review below PR and help me to finish it :
#13405

related: #8970

@VBhojawala commenting on another unrelated PR asking for a review is considered poor etiquette - please don't do this.

@dimberman dimberman mentioned this pull request Mar 26, 2021
@ashb
Copy link
Member

ashb commented Mar 31, 2021

Closing in favour of #15034 which continues this work.

@ashb ashb closed this Mar 31, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add @taskgroup decorator
6 participants