Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new datetime branch operator #11964

Merged
merged 15 commits into from
Mar 10, 2021

Conversation

tomasfarias
Copy link
Contributor

closes: #11929

This PR includes a new datetime branching operator: the current date and time, as given by datetime.datetime.now is compared against target datetime attributes, like year or hour, to decide which task id branch to take.

No tests were written yet as this is intended as a POC to allow us to review the implementation first.

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg
Copy link

boring-cyborg bot commented Oct 30, 2020

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, pylint and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@mik-laj
Copy link
Member

mik-laj commented Oct 30, 2020

DateTimeSensor has a much simpler interface. Why do we need so many parameters here?

@mik-laj mik-laj requested a review from kaxil October 30, 2020 11:39
@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@eladkal
Copy link
Contributor

eladkal commented Oct 30, 2020

I don't think you can actually compare exact specific datetime as you can't know when the task will be executed. For example branching based on specific minute will probably wont work as expected.

I think the concept should be more to get acceptable range : lower & upper. If current time is between the range then continue true branch otherwise continue false branch.
WDYT?

@kaxil
Copy link
Member

kaxil commented Nov 4, 2020

Can you please rebased your PR on latest Master since we have applied Black and PyUpgrade on Master.

It will help if your squash your commits into single commit first so that there are less conflicts.

@github-actions
Copy link

github-actions bot commented Nov 4, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

github-actions bot commented Nov 4, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@tomasfarias
Copy link
Contributor Author

@eladkal I agree, any unit smaller than hour is probably not something you'd be able to match reliably anyways 😓. I like your idea, and I've also allowed for either the upper bound or the lower bound to be set as None, but not both, to allow for unilateral comparisons, for example: if target_upper is None and the current datetime falls above target_lower then return true branch.

This also reduces the number of arguments, which was getting unnecessarily out of control as @mik-laj suggested.

Branch has been rebased as @kaxil asked.

I'm was having a bit of trouble getting the docs build to pass: now docs build passes locally, so I'm expecting it to pass with the latest commit.

Thanks all for reviews and input 💪

@github-actions
Copy link

github-actions bot commented Nov 4, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@tomasfarias tomasfarias marked this pull request as ready for review November 9, 2020 17:18
follow_task_ids_if_false: Union[str, Iterable[str]],
target_lower: Optional[datetime.datetime],
target_upper: Optional[datetime.datetime],
timezone: Optional[datetime.timezone] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this param is redundant.
We can respect the default timezone set in airflow.cfg and allow overwrite if the DAG is timezone aware.
Similar to how TimeSensor was modified #9882 #9699

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out 👍 Latest commit should replicate the work done for TimeSensor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add unit tests ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests added 🙌

@tomasfarias tomasfarias force-pushed the branch-datetime-operator branch 2 times, most recently from 0fe054c to f6ff87e Compare November 11, 2020 19:27
airflow/operators/datetime_branch_operator.py Outdated Show resolved Hide resolved
`datetime.datetime.now()` falls below target_lower or above `target_upper`.
:type follow_task_ids_if_false: str or list[str]
:param target_lower: target lower bound.
:type target_lower: Optional[datetime.datetime]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both target_* arguments are optional to support unilateral comparisons, i.e. comparing the current time to a lower target but without any upper bound and vice-versa. We do need at least one of the two to be defined otherwise there is no target to compare the current date to, which is why that is checked below in __init__.

:param target_lower: target lower bound.
:type target_lower: Optional[datetime.datetime]
:param target_upper: target upper bound.
:type target_upper: Optional[datetime.datetime]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered above ☝️

Comment on lines 218 to 233
branch_op = DateTimeBranchOperator(
task_id='datetime_branch',
follow_task_ids_if_true='branch_1',
follow_task_ids_if_false='branch_2',
target_upper=datetime.datetime(2020, 7, 7, 11, 0, 0),
target_lower=None,
dag=self.dag,
)

self.branch_1.set_upstream(branch_op)
self.branch_2.set_upstream(branch_op)
self.dag.clear()

dr = self.dag.create_dagrun(
run_id='manual__', start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE, state=State.RUNNING
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears in every test with almost indentical content -- could you create a helper function to reduce this boilerplate please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! Moved boilerplate code over to setUp

Comment on lines 249 to 250
@mock.patch('airflow.operators.datetime_branch_operator.timezone.utcnow')
def test_datetime_branch_operator_lower_comparison_outside_range(self, mock_timezone):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than mocking the datetime like this, please use freezegun

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I've used freezegun instead.

@tomasfarias
Copy link
Contributor Author

Not sure why MySQL build is failing, all tests appear to be passing according to logs.

@@ -0,0 +1,77 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the suffix of _operator in the file name should be removed (to comply with AIP 21)
see #11178

@@ -0,0 +1,195 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also remove the _operator suffix from the test file

Comment on lines 52 to 53
target_lower: Optional[datetime.datetime],
target_upper: Optional[datetime.datetime],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondring about the use cases of datetime in general.
I would assume that it's more likely that user would like to branch based on times rather than datetime.
Do you think we will need a different TimeBranchOperator or the two use cases can be combined?

Copy link
Contributor Author

@tomasfarias tomasfarias Nov 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Branching of times should be possible by setting the target values to something like:

target_lower=datetime.datetime.combine(
    datetime.datetime.now(), datetime.time(hour=target_hour, minute=target_minute, second=target_second)
)

But this is not as straightforward as passing a single datetime.time.

I think adding a TimeBranchOperator may be a bit redundant, instead, we could support targets of type Optional[Union[datetime.datetime, datetime.time]], and when encountering a datetime.time we combine it with the current date and proceed as it is right now:

def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
    now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
    target_upper = datetime.datetime.combine(now, self.target_upper) if isinstance(self.target_upper, datetime.time) else self.target_upper

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think the Optional[Union[datetime.datetime, datetime.time]] approach is better as it simplify the usage for the users but maybe worth asking for more opinions on this one.

note that there are edge cases for example lower is 23:00 and upper is 01:00 (the next day) so the duration of success is total of 2 hours yet it spread over two dates.

Also it will be good to add documentation about this operator. you can use previous PRs to see examples #11472

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured out it may be easier to hear other opinions with a concrete example, so I implemented the idea mostly as described. The edge case you described is handled by pushing the smaller time to the next day. Should the same thing be done when both times are equal? Or should an exception be raised instead?

Gave it a shot at adding documentation, do let me know if it needs to be extended or rewritten.

@github-actions
Copy link

github-actions bot commented Dec 6, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

Comment on lines 20 to 21
Example DAG demonstrating the usage of BranchPythonOperator with depends_on_past=True, where tasks may be run
or skipped on alternating runs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a description of another operator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about that, copy and paste blunder. Thanks for the review!

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
you will need to rebase to fix the tests/docs due to #13201

Comment on lines 31 to 32
True branch will be returned when `datetime.datetime.now()` falls below
`target_upper` and above `target_lower`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behaviour when backfilling or catching-up ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding something like:

if self.use_task_execution_day:
now = context["execution_date"]
else:
now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)

cc @eladkal

Copy link
Contributor Author

@tomasfarias tomasfarias Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! Behavior when backfilling would be to still use system date, which is probably not what a user would expect. I added a use_task_execution_date argument very similar to the one you linked, with some minor adaptation since we are talking about dates instead of days of the week.

self.follow_task_ids_if_false = follow_task_ids_if_false

def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented your suggestion (see the other comment).

@eladkal
Copy link
Contributor

eladkal commented Mar 9, 2021

@tomasfarias can you please address issues/questions raised by kaxil?

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
@tomasfarias
Copy link
Contributor Author

@eladkal Been a bit busy with work the last few weeks, but all comments should be addressed! Thanks for pinging me 👍

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Mar 10, 2021
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@kaxil kaxil merged commit 1e37a11 into apache:master Mar 10, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Mar 10, 2021

Awesome work, congrats on your first merged pull request!

@kaxil kaxil added this to the Airflow 2.1 milestone Mar 10, 2021
@eladkal
Copy link
Contributor

eladkal commented Mar 11, 2021

while commenting on #11931 i notice that we might have two issues here:

  1. The operator name DateTimeBranchOperator isn't consistent with other branch operators: BranchPythonOperator, BranchSQLOperator, BranchDayOfWeekOperator for consistnacy the operator should be renamed to BranchDateTimeOperator
  2. Should datetime_branch.py be datetime.py?

Since the operator isn't released yet we can handle this easily without deprecation notice.
@kaxil WDYT?

I can handle this if @tomasfarias has no time.

@tomasfarias
Copy link
Contributor Author

@eladkal Things are much quieter in my schedule now so I can push any changes to names if they are deemed appropriate.

I personally agree 100% with 1. as it plays nice with any IDE autocomplete where you can just get all the Branch* operator suggestions typing only that.

For 2. though I see we have both branch_operator.py as well as sql_branch_operator.py, so maybe we should settle for datetime_branch_operator.py? I'm fine either way on this one though.

@eladkal
Copy link
Contributor

eladkal commented Mar 11, 2021

For 2. though I see we have both branch_operator.py as well as sql_branch_operator.py, so maybe we should settle for datetime_branch_operator.py? I'm fine either way on this one though.

They are deprecated :)
branch_operator.py -> branch.py
sql_branch_operator.py > sql.py

@tomasfarias
Copy link
Contributor Author

Now that you mention that, we should also change the deprecated imports, e.g.:

from airflow.operators.branch_operator import BranchBaseOperator

To:

from airflow.operators.branch import BaseBranchOperator

I've pushed the changes to my fork, see this commit: tomasfarias@5fc2862. I can create a PR as soon as the changes are confirmed as needed 👍

Great feedback!

@kaxil
Copy link
Member

kaxil commented Mar 11, 2021

Good point @eladkal -- agree to your suggestions.

And yes since they are not released we can rename them without deprecations

@tomasfarias
Copy link
Contributor Author

I opened a PR with the changes suggested, linking here for reference: #14720

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

add DateTimeBranchOperator
6 participants