Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate tasks total priority weight in DagBag #37990

Closed
wants to merge 3 commits into from

Conversation

Taragolis
Copy link
Contributor

@Taragolis Taragolis commented Mar 8, 2024

This one is continuation of #34168.
Validation happen independently of the Database Type and expects that the task's priority_weight_total in int32 range which suits for all databases (Postgres, MySQL, SQLite).

I've also thought to add ability to replace to max allowed value, e.g. lax mode, however it required to workaround tasks Weight Rules and maybe it is better to do it after #36029 completed

ignore - options exists for compatibility with previous behaviour, however it might work only in SQLite which are not suits for the production usage.


ERROR [airflow.models.dagbag.DagBag] Failed to bag_dag: /files/dags/pr_37990.py
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/dagbag.py", line 445, in _process_modules
    self.bag_dag(dag=dag, root_dag=dag)
  File "/opt/airflow/airflow/models/dagbag.py", line 464, in bag_dag
    self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
  File "/opt/airflow/airflow/models/dagbag.py", line 473, in _bag_dag
    check_values_overflow(dag)
  File "/opt/airflow/airflow/utils/dag_parameters_overflow.py", line 50, in check_values_overflow
    raise AirflowDagTaskOutOfBoundsValue(error_msg)
airflow.exceptions.AirflowDagTaskOutOfBoundsValue: Tasks in dag 'pr_37990' exceeds allowed priority weight [-2147483648..2147483647] range: 
 * Task 'stage1' has priority weight 2147483649.
 * Task 'stage2' has priority weight 2147483648.

image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@Taragolis Taragolis requested review from kaxil, XD-DENG and ashb as code owners March 8, 2024 10:24
@Taragolis Taragolis added the full tests needed We need to run full set of tests for this PR to merge label Mar 8, 2024
@Taragolis Taragolis added this to the Airflow 2.9.0 milestone Mar 8, 2024
@Taragolis Taragolis closed this Mar 8, 2024
@Taragolis Taragolis reopened this Mar 8, 2024
@potiuk
Copy link
Member

potiuk commented Mar 10, 2024

I am ok with having this in, but .. shouldn't we always ignore and put lower/upper bound instead? There is very little value in erroring out - maybe we should log a warning instead and always ignore ? I am very conscious about a new config parameter that literally no-one will use

@Taragolis
Copy link
Contributor Author

I've also think about it that config potentially useless, at least right now. So another option is just errored it in DagBag for now, because in production it will raise an error during put into DB which cause error on scheduler.

With setup upper-lower bound it required to change priority rule to the absolute and value to min/max in current implementation due to the fact that total priority calculated only when it tried to put into the DB


error_msg = (
f"Tasks in dag {dag.dag_id!r} exceeds allowed priority weight "
f"[{_WEIGHT_LOWER_BOUND}..{_WEIGHT_UPPER_BOUND}] range: \n * "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of showing this in decimal, I wonder if the 2-based values would be more user-friendly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better add add thousand separators instead?

Python 3.9.9 (main, Nov  1 2022, 18:58:33) 
[Clang 14.0.0 (clang-1400.0.29.102)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> _WEIGHT_UPPER_BOUND = 2 ** 31 - 1
>>> _WEIGHT_LOWER_BOUND = -(2 ** 31)
>>> f"[{_WEIGHT_LOWER_BOUND:,}..{_WEIGHT_UPPER_BOUND:,}] range:"
'[-2,147,483,648..2,147,483,647] range:'

I'm not sure it is common situation that someone overflow total_priority, it is really hard to exceed priority

Comment on lines +28 to +29
_WEIGHT_UPPER_BOUND = 2147483647 # 2 ** 31 -1
_WEIGHT_LOWER_BOUND = -2147483648 # -(2 ** 31)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_WEIGHT_UPPER_BOUND = 2147483647 # 2 ** 31 -1
_WEIGHT_LOWER_BOUND = -2147483648 # -(2 ** 31)
_WEIGHT_UPPER_BOUND = 2 ** 31 - 1
_WEIGHT_LOWER_BOUND = -(2 ** 31)

@Taragolis
Copy link
Contributor Author

Closed, it should be implemented in top of the #38222

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants