Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add start date to trigger_dagrun operator #18226

Merged
merged 11 commits into from
Sep 24, 2021

Conversation

bhavaniravi
Copy link
Contributor

closes: #18082


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:core-operators Operators, Sensors and hooks within Core Airflow labels Sep 14, 2021
Copy link
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you made the change in the wrong place. Here's where I think the change should be:


Add start_date=self.start_date after the line above.

EDIT:
You are actually correct, just fix the failing tests

@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Sep 14, 2021
@kaxil
Copy link
Member

kaxil commented Sep 15, 2021

=========================== short test summary info ============================
  FAILED tests/models/test_cleartasks.py::TestClearTasks::test_clear_task_instances
  FAILED tests/models/test_cleartasks.py::TestClearTasks::test_clear_task_instances_with_task_reschedule
  FAILED tests/models/test_cleartasks.py::TestClearTasks::test_dag_clear - asse...
  = 3 failed, 2445 passed, 61 skipped, 3 xfailed, 4 warnings in 508.01s (0:08:28) =

tests are failing

@bhavaniravi
Copy link
Contributor Author

bhavaniravi commented Sep 15, 2021

Hey, @kaxil, I cannot recreate the failing tests using breeze env or pytest with the same python and Postgres versions. Looks like a persistent issue across PRs,

https://github.com/apache/airflow/pull/18243/checks?check_run_id=3604451375
Let me rebase and push again

@ephraimbuddy
Copy link
Contributor

Looks like something went wrong in your last rebase. Please rebase again

@@ -82,6 +82,7 @@ def _trigger_dag(
for _dag in dags_to_trigger:
trigger = _dag.create_dagrun(
run_id=run_id,
start_date=timezone.utcnow(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the dag you trigger is timezone aware does it matter?
I mean you consider here the default timezone but the dag you are triggering may be in a diffrent timezone than the default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The start_date here is not the DAG start_date. It is the dag run starting time of the run. Internally it's all tracked in UTC and rendered in the user's timezone on UI.

Correct me if I am wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You’re correct. Although maybe we shouldn’t set this to RUNNING in the first place, but should set it to QUEUED and let the scheduler start the run (and set start_date) instead. See how this is implemented in the new REST API (in airflow/api_connexion/endpoints/dag_run_endpoint.py::post_dag_run).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supported. Setting it to queued will save us some complaints especially about max_active_runs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I go ahead and make that change? There are 3 places where we are creating dag_runs with the state running, should this be the same behavior across the board?

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only change the one identified here. Looks like others are more involved change and requires a separate PR(that's if it should be changed)

Copy link
Contributor Author

@bhavaniravi bhavaniravi Sep 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will affect three places. All these use the trigger_dag function.

  1. TriggerDAGRunOperator
  2. APIClient - dag_runs
  3. CLI trigger dag

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr
Copy link
Member

Failure seems unrelated

   _____________________ test_external_task_marker_exception ______________________
  
  dag_bag_ext = <airflow.models.dagbag.DagBag object at 0x7fec5c30c730>
  
      def test_external_task_marker_exception(dag_bag_ext):
          """
          Clearing across multiple DAGs should raise AirflowException if more levels are being cleared
          than allowed by the recursion_depth of the first ExternalTaskMarker being cleared.
          """
  >       run_tasks(dag_bag_ext)
  ...
  >       assert task_instance.state == state
  E       AssertionError: assert None == <TaskInstanceState.SUCCESS: 'success'>
  E        +  where None = <TaskInstance: dag_0.task_b_0 manual__2015-01-01T00:00:00+00:00 [None]>.state

task.run(start_date=execution_date, end_date=execution_date)

with create_session() as session:
dagruns = session.query(DagRun).filter(DagRun.dag_id == self.dag.dag_id).all()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want to add an explicit order_by here, the db does not always return rows in a deterministic order.

@kaxil kaxil merged commit 6609e9a into apache:main Sep 24, 2021
@kaxil
Copy link
Member

kaxil commented Sep 24, 2021

Thanks @bhavaniravi 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:core-operators Operators, Sensors and hooks within Core Airflow full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

TriggerDagRunOperator start_date is not set
5 participants