Skip to content

Commit

Permalink
Resolve SA warnings in migrations scripts (#39418)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5e58723)
  • Loading branch information
Taragolis authored and utkarsharma2 committed Jun 5, 2024
1 parent c2f93bd commit b8b845a
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,7 @@ def upgrade():
constraints = get_mssql_table_constraints(conn, "task_instance")
pk, _ = constraints["PRIMARY KEY"].popitem()
batch_op.drop_constraint(pk, type_="primary")
elif dialect_name not in ("sqlite"):
batch_op.drop_constraint("task_instance_pkey", type_="primary")
batch_op.drop_constraint("task_instance_pkey", type_="primary")
batch_op.drop_index("ti_dag_date")
batch_op.drop_index("ti_state_lkp")
batch_op.drop_column("execution_date")
Expand Down Expand Up @@ -401,7 +400,7 @@ def _multi_table_update(dialect_name, target, column):
if dialect_name == "sqlite":
# Most SQLite versions don't support multi table update (and SQLA doesn't know about it anyway), so we
# need to do a Correlated subquery update
sub_q = select(dag_run.c[column.name]).where(condition)
sub_q = select(dag_run.c[column.name]).where(condition).scalar_subquery()

return target.update().values({column: sub_q})
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,14 @@ def upgrade():
xcom = Table("xcom", metadata, *_get_old_xcom_columns())
dagrun = _get_dagrun_table()
query = select(
[
dagrun.c.id,
xcom.c.task_id,
xcom.c.key,
xcom.c.value,
xcom.c.timestamp,
xcom.c.dag_id,
dagrun.c.run_id,
literal_column("-1"),
],
dagrun.c.id,
xcom.c.task_id,
xcom.c.key,
xcom.c.value,
xcom.c.timestamp,
xcom.c.dag_id,
dagrun.c.run_id,
literal_column("-1"),
).select_from(
xcom.join(
right=dagrun,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def _multi_table_update(dialect_name, target, column):
if dialect_name == "sqlite":
# Most SQLite versions don't support multi table update (and SQLA doesn't know about it anyway), so we
# need to do a Correlated subquery update
sub_q = select(dag_run.c[column.name]).where(condition)
sub_q = select(dag_run.c[column.name]).where(condition).scalar_subquery()

return target.update().values({column: sub_q})
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def _update_value_from_dag_run(
if dialect_name == "sqlite":
# Most SQLite versions don't support multi table update (and SQLA doesn't know about it anyway), so we
# need to do a Correlated subquery update
sub_q = select(dag_run.c[target_column.name]).where(condition)
sub_q = select(dag_run.c[target_column.name]).where(condition).scalar_subquery()

return target_table.update().values({target_column: sub_q})
else:
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
00b4a76587ea1f5f2f317a555a083a0b4964c727d0a3c8c2044ac4f19c58abd4
a82d08fd5d725c40dab022228ac57a514851c1718c21fe8712ff5f12698b5d1f
4 changes: 2 additions & 2 deletions docs/apache-airflow/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit b8b845a

Please sign in to comment.