-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Validate DAG task_instance.priority_weight column overflow value during import #34168
Conversation
Adding @uranusjr to conversation |
_POSTGRES_PRIORITY_WEIGHT_UPPER_BOUND = 2147483647 | ||
_POSTGRES_PRIORITY_WEIGHT_LOWER_BOUND = -2147483648 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that this error is exclusive for Postgres only. I quick check in different backends except MsSQL
@pytest.mark.parametrize(
"overflow_priority_weight",
[
pytest.param(2**31-1, id="signed-int32-positive-max"),
pytest.param(-2**31, id="signed-int32-negative-min"),
]
)
def test_priority_weight_overflow(dag_maker, overflow_priority_weight):
# Test that priority_weight_total sum up overflows
with dag_maker(dag_id="test_priority_weight_sum_up_overflow"):
op1 = EmptyOperator(task_id="stage1", priority_weight=10 if overflow_priority_weight > 0 else -10)
op2 = EmptyOperator(task_id="stage2", priority_weight=overflow_priority_weight)
op1 >> op2
# MySQL and Postgres: Fail
# SQLite: Fine, it use int64 (I'm not sure is it depend on platform or not)
# MsSQL, who cares? https://lists.apache.org/thread/pgcgmhf6560k8jbsmz8nlyoxosvltph2
dag_maker.create_dagrun(run_id="foo-bar")
@pytest.mark.parametrize(
"test_priority_weight",
[
pytest.param(2**15-1, id="signed-int16-positive-max"),
pytest.param(-2**15, id="signed-int16-negative-min"),
]
)
def test_priority_weight_fine(dag_maker, test_priority_weight):
# Test that priority_weight_total sum up overflows
with dag_maker(dag_id="test_priority_weight_sum_up_overflow"):
op1 = EmptyOperator(task_id="stage1", priority_weight=10 if test_priority_weight > 0 else -10)
op2 = EmptyOperator(task_id="stage2", priority_weight=test_priority_weight)
op1 >> op2
# MySQL, Postgres, SQLite: Fine
dag_maker.create_dagrun(run_id="foo-bar")
@pytest.mark.parametrize(
"overflow_priority_weight",
[
pytest.param(2**63-1, id="signed-int64-positive-max"),
pytest.param(-2**63, id="signed-int64-negative-min"),
]
)
def test_priority_weight_overflow_bigint(dag_maker, overflow_priority_weight):
# Test that priority_weight_total sum up overflows
with dag_maker(dag_id="test_priority_weight_sum_up_overflow"):
op1 = EmptyOperator(task_id="stage1", priority_weight=10 if overflow_priority_weight > 0 else -10)
op2 = EmptyOperator(task_id="stage2", priority_weight=overflow_priority_weight)
op1 >> op2
# MySQL, Postgres, SQLite: Fail
dag_maker.create_dagrun(run_id="foo-bar")
MySQL error (overflow int32)
E sqlalchemy.exc.DataError: (MySQLdb.DataError) (1264, "Out of range value for column 'priority_weight' at row 1")
E [SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, try_number, max_tries, hostname, unixname, pool, pool_slots, queue, priority_weight, operator, executor_config, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)]
E [parameters: (('stage1', 'test_priority_weight_sum_up_overflow', 'foo-bar', -1, 0, 0, '', 'root', 'default_pool', 1, 'default', 2147483657, 'EmptyOperator', b'\x80\x05}\x94.', datetime.datetime(2023, 9, 7, 11, 37, 48, 471702)), ('stage2', 'test_priority_weight_sum_up_overflow', 'foo-bar', -1, 0, 0, '', 'root', 'default_pool', 1, 'default', 2147483647, 'EmptyOperator', b'\x80\x05}\x94.', datetime.datetime(2023, 9, 7, 11, 37, 48, 471706)))]
E (Background on this error at: https://sqlalche.me/e/14/9h9h)
SQLite Overflow Error (overflow int64)
def do_executemany(self, cursor, statement, parameters, context=None):
> cursor.executemany(statement, parameters)
E OverflowError: Python int too large to convert to SQLite INTEGER
I guess we could hardcode check to int32 for all databases even for SQLite. SQLite not intends to use in production and even if it pass in development it should fail in production where Postgres or MySQL will use.
But for decide is it applicable and not breaking changes we need expert advice here. cc: @potiuk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or I've just think about another option which might make happy most of the users, it is introduce new config option something like [core] task_weight_rule_validation
with 3 possible values (might be strings or bool + None):
- None (default) keep as it is, user should manually control it based on DB backend. Basically how it work right now.
- lax if
priority_weight
overflow int32 (or int16 we could choose any) then replace it to closest allowed value. - strict if
priority_weight
overflow int32 (or int16 we could choose any) then raise an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments @Taragolis . Yes, it does affect other DB backens but I wanted to start with Postgres as per indication with the original PR discussion. I have no problem if we add for other engines now, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I've just test against MsSQL it also limmited by int32
context = <sqlalchemy.dialects.mssql.pyodbc.MSExecutionContext_pyodbc object at 0x7f452060bdc0>
def do_executemany(self, cursor, statement, parameters, context=None):
> cursor.executemany(statement, parameters)
E OverflowError: int too big to convert
As conclusion only SQLite has support int64 as Integer, all other backends limited by int32
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments @Taragolis . Yes, it does affect other DB backens but I wanted to start with Postgres as per indication with the original PR discussion. I have no problem if we add for other engines now, though.
Anything we do MUST be added for all supported backends unless it is ia specific optimisation for specific backend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@potiuk , @Taragolis
So, should we set it 64 for MsSQL and 32 for the others? or just 32 for all?
The options suggested by @Taragolis for a config is also interesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another drafted idea (needs more thinking) to avoid doing this for a specific column:
save_point = session.begin_nested() #create a save point to keep the DagRun, otherwise the exception causes to rollback and lose it
try:
if hook_is_noop:
session.bulk_insert_mappings(TI, tasks)
else:
session.bulk_save_objects(tasks)
for task_type, count in created_counts.items():
Stats.incr(f"task_instance_created_{task_type}", count, tags=self.stats_tags)
# Same metric with tagging
Stats.incr("task_instance_created", count, tags={**self.stats_tags, "task_type": task_type})
session.flush()
except IntegrityError:
..
..
except DataError:
self.log.info(
"Hit DataError while creating the TIs for %s- %s",
dag_id,
run_id,
exc_info=True,
)
self.log.info("DataError .......")
session.rollback() #it will keep the DAG run so that we can mark it as failed
self.log.error("Marking run %s failed", self)
self.set_state(DagRunState.FAILED)
self.notify_dagrun_state_changed(msg="task_failure")
session.flush()
One caveat is that the task instance itself will have no logs since it was not inserted in the db.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea with catch IntegrityError
could have different side effects and in general we need to use DB specific error code, otherwise we could catch something that we do not wanted.
Still think approach with rules might give better flexibility for maintainers / user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Taragolis I can work on that approach if it is considered the way to go, but I would like to hear from other reviewes before making an effort on this. Basically get an agreement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we’re dropping MSSQL support soon™ we should just go with 32-bit. Let’s do this.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
Closes: #22781
This is a follow up implementation of fix during DAG import regarding #22784. As it was discussed in that PR, the solution should consider postgresql, at least for the moment.
I did some research on sqlalchemy allowing to get the max length in the column but did not find anything to achieve that. So I set some upper and lower bound variables.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.