Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from 2.1.4 to 2.2.0 #18894

Closed
1 of 2 tasks
sbialkowski-pixel opened this issue Oct 11, 2021 · 33 comments · Fixed by #18953
Closed
1 of 2 tasks

Migrate from 2.1.4 to 2.2.0 #18894

sbialkowski-pixel opened this issue Oct 11, 2021 · 33 comments · Fixed by #18953
Labels
affected_version:2.2 Issues Reported for 2.2 area:upgrade Facilitating migration to a newer version of Airflow kind:documentation

Comments

@sbialkowski-pixel
Copy link

sbialkowski-pixel commented Oct 11, 2021

Apache Airflow version

2.2.0

Operating System

Linux

Versions of Apache Airflow Providers

default.

Deployment

Docker-Compose

Deployment details

Using airflow-2.2.0python3.7

What happened

Upgrading image from apache/airflow:2.1.4-python3.7
to apache/airflow:2.2.0-python3.7
Cause this inside scheduler, which is not starting:

Python version: 3.7.12
Airflow version: 2.2.0
Node: 6dd55b0a5dd7
-------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.UndefinedColumn: column dag.max_active_tasks does not exist
LINE 1: ..., dag.schedule_interval AS dag_schedule_interval, dag.max_ac...
                                                             ^

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/.local/lib/python3.7/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflow/.local/lib/python3.7/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/airflow/.local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/home/airflow/.local/lib/python3.7/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflow/.local/lib/python3.7/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/www/auth.py", line 51, in decorated
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/www/views.py", line 588, in index
    filter_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/www/security.py", line 377, in get_accessible_dag_ids
    return {dag.dag_id for dag in accessible_dags}
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
    return self._execute_and_instances(context)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
    distilled_params,
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column dag.max_active_tasks does not exist
LINE 1: ..., dag.schedule_interval AS dag_schedule_interval, dag.max_ac...
                                                             ^

[SQL: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active AS dag_is_active, dag.last_parsed_time AS dag_last_parsed_time, dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS dag_schedule_interval, dag.max_active_tasks AS dag_max_active_tasks, dag.max_active_runs AS dag_max_active_runs, dag.has_task_concurrency_limits AS dag_has_task_concurrency_limits, dag.next_dagrun AS dag_next_dagrun, dag.next_dagrun_data_interval_start AS dag_next_dagrun_data_interval_start, dag.next_dagrun_data_interval_end AS dag_next_dagrun_data_interval_end, dag.next_dagrun_create_after AS dag_next_dagrun_create_after 
FROM dag]
(Background on this error at: http://sqlalche.me/e/13/f405)

What you expected to happen

Automatic database migration and properly working scheduler.

How to reproduce

Ugrade from 2.1.4 to 2.2.0 with some dags history.

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@sbialkowski-pixel sbialkowski-pixel added area:core kind:bug This is a clearly a bug labels Oct 11, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 11, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@ephraimbuddy
Copy link
Contributor

Did you run airflow db upgrade?

@sbialkowski-pixel
Copy link
Author

sbialkowski-pixel commented Oct 11, 2021

Yes, I did. Got this:

airflow@6dd55b0a5dd7:/opt/airflow$ airflow db upgrade
/home/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:357
DeprecationWarning: The dag_concurrency option in [core] has been renamed to max_active_tasks_per_dag - the old setting has been used, but please update your config.
DB: postgresql+psycopg2://airflow:***@postgres/airflow
/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:3573 
SAWarning: Predicate of partial index idx_dag_run_queued_dags ignored during reflection
/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:3573 
SAWarning: Predicate of partial index idx_dag_run_running_dags ignored during reflection
[2021-10-11 19:21:13,069] {db.py:815} ERROR - Automatic migration is not available
[2021-10-11 19:21:13,069] {db.py:817} ERROR - The task_instance table has 53 rows without a corresponding dag_run row. You must manually correct this problem (possibly by deleting the problem rows).
[2021-10-11 19:21:13,071] {db.py:817} ERROR - The task_fail table has 24 rows without a corresponding dag_run row. You must manually correct this problem (possibly by deleting the problem rows).

I have just killed my docker-compose with 2.1.4, changed image version to 2.2.0 and airflow never started again :)
Downgrading to 2.1.4 fixes problem :)

@potiuk
Copy link
Member

potiuk commented Oct 11, 2021

Did you do the suggested fix ?

The task_instance table has 53 rows without a corresponding dag_run row. You must manually correct this problem (possibly by deleting the problem rows).
The task_fail table has 24 rows without a corresponding dag_run row. You must manually correct this problem (possibly by deleting the problem rows).` ?

@sbialkowski-pixel
Copy link
Author

Not yet, but I'm upgrading each new release from version 2.0.2 and this is first time, I'm getting migration problem. So I thought it's bigger problem. Especially I want to upgrade version in my production environment and seams it will be problematic.

@potiuk
Copy link
Member

potiuk commented Oct 11, 2021

Well, I think you should folllow what the suggestion is. You seem to have some wrong entries in your DB (which migh be result of bugs or some very old versions of airflow or both).

Closing as invalid unless following the migration suggestion does not fix the problem (likely you will have to do similar fixes in prooduction BTW).

@potiuk potiuk closed this as completed Oct 11, 2021
@potiuk potiuk added the invalid label Oct 11, 2021
@sbialkowski-pixel
Copy link
Author

After deleting wrong rows airflow is working.
Thank you for help.
Can be closed.

@RenGeng
Copy link
Contributor

RenGeng commented Oct 12, 2021

Hi @sbialkowski-pixel do you have the query to delete those rows in the postgres console ? I'm also facing this issue.

@sbialkowski-pixel
Copy link
Author

sbialkowski-pixel commented Oct 12, 2021

I've deleted this query, sorry.
But it was inspired how creator throw this error. So I found where in airflow code this error is produced:

airflow/airflow/utils/db.py

Lines 766 to 774 in bc19ae7

join_cond = and_(table.c.dag_id == target.c.dag_id, table.c.execution_date == target.c.execution_date)
if "task_id" in target.columns:
join_cond = and_(join_cond, table.c.task_id == target.c.task_id)
query = (
session.query(table.c.dag_id, table.c.task_id, table.c.execution_date)
.select_from(outerjoin(table, target, join_cond))
.filter(target.c.dag_id.is_(None))
) # type: ignore

And I've recreate this in SQL.
If some references are lost, this kind of error should be repaired automatically IHMO.

@uranusjr
Copy link
Member

If some references are lost, this kind of error should be repaired automatically IHMO.

Airflow does not delete user-generated data because we don't know if they are important to the user. It would be a disaster if we automatically delete the offending records if the user delibrately added them and did not get a chance to move them somewhere.

For the same reason, please inspect and fully understand any SQL queries suggested in this issue before running them, to make sure you are actually fine with those rows being deleted.

@sbialkowski-pixel
Copy link
Author

Problem is that there is no clue, which rows need to be deleted. Also there is no straight logic behind "corresponding rows" for me.

This part of code for me is the only source of some guidelines.

If there is better way to deal with this kind of problem in future, I will appreciate to hear it :)

@sbialkowski-pixel
Copy link
Author

If some references are lost, this kind of error should be repaired automatically IHMO.

Airflow does not delete user-generated data because we don't know if they are important to the user. It would be a disaster if we automatically delete the offending records if the user delibrately added them and did not get a chance to move them somewhere.

For the same reason, please inspect and fully understand any SQL queries suggested in this issue before running them, to make sure you are actually fine with those rows being deleted.

OK. But if failed task has no more reference to dag_run, there is no point to keep it in database...
Or maybe there is something behind this logic, but I dont get it.

@potiuk
Copy link
Member

potiuk commented Oct 12, 2021

If some references are lost, this kind of error should be repaired automatically IHMO.

What's the reason for those rows to appear in the DB ? How did they get there? Do we know it?

I think there are two possible scenarios:

  1. If it is result of normal operation of Airflow ( even some some special circumstances, manually triggered dags using CLI or UI)

If that's the case, I agree it should be handled better - Airflow cleanig them or adding "fake" run_ids for those during migration is what I would expect as we should handle this as "regular" migration scenario.

  1. They are result of user adding manually entries to the database.

If this is the case then the BEST we could do is to spit-out the exact SQL query to run to delete those rows. We should not delete them automaticaly.

@potiuk
Copy link
Member

potiuk commented Oct 12, 2021

Seems other people have similar issue #18912 so for me it looks highly unlikely this was manually added.

I think the number of reports we have in such short time (I saw 3 reports already) indicate that those rows can appear frequently as result of normal operations by Airflow, and many users might hve similar issues soon.

If this is realy result of "regular" airflow behaviour, for me it calls for a very quick 2.2.1 with improved migration to handle that case (cc: @kaxil @jedcunningham)

@potiuk potiuk reopened this Oct 12, 2021
@sbialkowski-pixel
Copy link
Author

If some references are lost, this kind of error should be repaired automatically IHMO.

What's the reason for those rows to appear in the DB ? How did they get there? Do we know it?

I think there are two possible scenarios:

  1. If it is result of normal operation of Airflow ( even some some special circumstances, manually triggered dags using CLI or UI)

If that's the case, I agree it should be handled better - Airflow cleanig them or adding "fake" run_ids for those during migration is what I would expect as we should handle this as "regular" migration scenario.

  1. They are result of user adding manually entries to the database.

If this is the case then the BEST we could do is to spit-out the exact SQL query to run to delete those rows. We should not delete them automaticaly.

Case 1. Yes, I'm triggering dags manually from UI or by RestApi.
Case 2. That was very first time I had to work with Airflow Database. I have never add or modified it manually.
I'm only reading connection from db.

@potiuk
Copy link
Member

potiuk commented Oct 12, 2021

Case 2. That was very first time I had to work with Airflow Database. I have never add or modified it manually.
I'm only reading connection from db.

Thanks for confirming @sbialkowski-pixel! @kaxil @jedcunningham I think we need to seriously consider 2.2.1

@leonsmith
Copy link
Contributor

leonsmith commented Oct 12, 2021

SQL we used from our deployment playbook to clean up these tables

Obviously a disclaimer on running this blindly. Validate its rows you can safely remove first by just executing the CTEs

BEGIN;

-- Remove dag runs without a valid run_id
DELETE FROM dag_run WHERE run_id is NULL;

-- Remove task fails without a run_id
WITH task_fails_to_remove AS (
  SELECT 
    task_fail.dag_id,
    task_fail.task_id,
    task_fail.execution_date
  FROM
    task_fail
  LEFT JOIN 
    dag_run ON 
    dag_run.dag_id = task_fail.dag_id 
    AND dag_run.execution_date = task_fail.execution_date
  WHERE
    dag_run.run_id IS NULL
)
DELETE FROM
    task_fail
USING 
    task_fails_to_remove
WHERE (
    task_fail.dag_id = task_fails_to_remove.dag_id
    AND task_fail.task_id = task_fails_to_remove.task_id
    AND task_fail.execution_date = task_fails_to_remove.execution_date
);

-- Remove task instances without a run_id
WITH task_instances_to_remove AS (
  SELECT
    task_instance.dag_id,
    task_instance.task_id,
    task_instance.execution_date
  FROM
    task_instance
  LEFT JOIN 
    dag_run 
    ON dag_run.dag_id = task_instance.dag_id
    AND dag_run.execution_date = task_instance.execution_date
  WHERE 
    dag_run.run_id is NULL
)
DELETE FROM 
    task_instance
USING
    task_instances_to_remove
WHERE (
    task_instance.dag_id = task_instances_to_remove.dag_id
    AND task_instance.task_id = task_instances_to_remove.task_id
    AND task_instance.execution_date = task_instances_to_remove.execution_date
);

COMMIT;

@uranusjr
Copy link
Member

uranusjr commented Oct 12, 2021

Might also want to remove DAG runs with NULL execution date, I think 2.2 also started enforcing those (and I think their existence might mess up some clauses in the SQL script; not sure).

I still feel we shouldn't blindly deleting records, but some write up in documentation and better error messaging should be added to aid people through the cleanup.

@potiuk
Copy link
Member

potiuk commented Oct 12, 2021

How old your databases were @RenGeng @sbialkowski-pixel ? Did you upgrade to Airflow 2 from 1.10 before ?

@sbialkowski-pixel
Copy link
Author

I've started from 2.1.2
Then upgrade to 2.1.4 and now
2.2.0

@RenGeng
Copy link
Contributor

RenGeng commented Oct 12, 2021

We've started using airflow one year ago with 1.10 and then updated to every new version when there is a release

@jedcunningham
Copy link
Member

@sbialkowski-pixel @RenGeng, thanks! Would either of you happen to have an example execution_date from one of those task_instance rows (or better yet a couple)? Mostly interested in the seconds precision, e.g. .976545 indicating a "now" run or .000000 indicating a scheduled run.

@zachliu
Copy link
Contributor

zachliu commented Oct 12, 2021

just ran this in my prod db

SELECT task_instance.execution_date,
       task_instance.task_id,
       task_instance.dag_id
FROM task_instance
LEFT JOIN dag_run ON task_instance.dag_id = dag_run.dag_id
AND task_instance.execution_date = dag_run.execution_date
WHERE dag_run.run_id IS NULL;

my execution_date column is all scheduled runs i think

2020-01-12 00:00:00+00
2020-09-09 18:00:00+00
2020-09-09 18:00:00+00
2020-09-09 18:00:00+00
2020-09-09 18:00:00+00
2020-09-09 18:00:00+00
2020-09-09 18:00:00+00
2020-01-15 13:30:00+00
2020-01-15 13:30:00+00
2020-01-15 13:30:00+00
2020-01-15 13:30:00+00

then i just ran

DELETE
FROM task_instance ti USING
  (SELECT task_instance.execution_date,
          task_instance.task_id,
          task_instance.dag_id
   FROM task_instance
   LEFT JOIN dag_run ON task_instance.dag_id = dag_run.dag_id
   AND task_instance.execution_date = dag_run.execution_date
   WHERE dag_run.run_id IS NULL) trash
WHERE ti.dag_id = trash.dag_id
  AND ti.task_id = trash.task_id
  AND ti.execution_date = trash.execution_date;

no problems in migration

@uranusjr
Copy link
Member

uranusjr commented Oct 12, 2021

Also I'd be interested if you have ever deleted any DAG runs from the web UI (or if you can identify any of the execution_date values in task_instance and task_fail etc. correspond to those DAG runs; I'm fully aware this is a hard ask though). IIRC there was a bug that cause task instances to stay in the database when a DAG run is deleted.

@uranusjr uranusjr removed the kind:bug This is a clearly a bug label Oct 12, 2021
@uranusjr uranusjr added area:upgrade Facilitating migration to a newer version of Airflow kind:documentation and removed invalid area:core labels Oct 12, 2021
@kaxil kaxil added the affected_version:2.2 Issues Reported for 2.2 label Oct 13, 2021
@zachliu
Copy link
Contributor

zachliu commented Oct 13, 2021

slack doesn't keep historical records, i'm transferring my comments here:

my airflow journey starts with 1.10.4

1.10.4 -> 1.10.5 -> 1.10.7 -> 1.10.9 -> 1.10.10 -> 1.10.11 -> 1.10.12 -> 1.10.13 -> 1.10.14 -> 2.0.0 -> 2.0.1 -> 2.0.2 -> 2.1.0 -> 2.1.1 -> 2.1.2 -> 2.1.3 -> 2.1.4 -> 2.2.0

@konfusator
Copy link


LEFT JOIN
dag_run ON
dag_run.dag_id = task_fail.dag_id
AND dr.execution_date = task_fail.execution_date

This should read »dag_run.execution_date«

@MM-Lehmann
Copy link

MM-Lehmann commented Oct 13, 2021 via email

@mhaalme
Copy link

mhaalme commented Oct 14, 2021

slack doesn't keep historical records, i'm transferring my comments here:

my airflow journey starts with 1.10.4

1.10.4 -> 1.10.5 -> 1.10.7 -> 1.10.9 -> 1.10.10 -> 1.10.11 -> 1.10.12 -> 1.10.13 -> 1.10.14 -> 2.0.0 -> 2.0.1 -> 2.0.2 -> 2.1.0 -> 2.1.1 -> 2.1.2 -> 2.1.3 -> 2.1.4 -> 2.2.0

#15986 might indeed have something to do with it.

I just deleted a few DAG runs on an Airflow instance which is still running 2.1.0 and can confirm that the task instances from those runs remain in the DB and show up in the query used to identify dangling rows: SELECT task_instance.execution_date, task_instance.task_id, task_instance.dag_id FROM task_instance LEFT JOIN dag_run ON task_instance.dag_id = dag_run.dag_id AND task_instance.execution_date = dag_run.execution_date WHERE dag_run.run_id IS NULL

All rows showing up in the query for this instance are related to DAGs where I have deleted DAG runs in the past, including scheduled and manual runs.

@troyharvey
Copy link
Contributor

⬆️ We are seeing this while trying to upgrade a (new) Airflow 2.1.4 project.

@ldacey
Copy link
Contributor

ldacey commented Oct 15, 2021

This impacted me. The one thing that stood out was that most of the execution dates were from a period I definitely remember deleting. There was a DAG which was downloading data incrementally every 30 minutes (always download data > previous max ID) and I needed to clear a month of the data so it could start downloading again from like August onwards. I deleted these runs using the UI, so perhaps that is why?

@jedcunningham
Copy link
Member

@ldacey, yep, deleting dagruns via the UI turns out to be a common way to hit this (it's been a bug for a while it seems: #15986).

@jasonoberme
Copy link

For anyone experiencing this when testing standlone and using SQLite, delete these ids:


select fail_id from (
(select id as fail_id,dag_id,execution_date from task_fail ) as fail
left join
(
select dag_id,execution_date from dag_run 
) as run
on run.dag_id=fail.dag_id and fail.execution_date=run.execution_date
) where run.dag_id is null

@aprettyloner
Copy link

SQL we used from our deployment playbook to clean up these tables

Obviously a disclaimer on running this blindly. Validate its rows you can safely remove first by just executing the CTEs

BEGIN;

-- Remove dag runs without a valid run_id
DELETE FROM dag_run WHERE run_id is NULL;

-- Remove task fails without a run_id
WITH task_fails_to_remove AS (
  SELECT 
    task_fail.dag_id,
    task_fail.task_id,
    task_fail.execution_date
  FROM
    task_fail
  LEFT JOIN 
    dag_run ON 
    dag_run.dag_id = task_fail.dag_id 
    AND dag_run.execution_date = task_fail.execution_date
  WHERE
    dag_run.run_id IS NULL
)
DELETE FROM
    task_fail
USING 
    task_fails_to_remove
WHERE (
    task_fail.dag_id = task_fails_to_remove.dag_id
    AND task_fail.task_id = task_fails_to_remove.task_id
    AND task_fail.execution_date = task_fails_to_remove.execution_date
);

-- Remove task instances without a run_id
WITH task_instances_to_remove AS (
  SELECT
    task_instance.dag_id,
    task_instance.task_id,
    task_instance.execution_date
  FROM
    task_instance
  LEFT JOIN 
    dag_run 
    ON dag_run.dag_id = task_instance.dag_id
    AND dag_run.execution_date = task_instance.execution_date
  WHERE 
    dag_run.run_id is NULL
)
DELETE FROM 
    task_instance
USING
    task_instances_to_remove
WHERE (
    task_instance.dag_id = task_instances_to_remove.dag_id
    AND task_instance.task_id = task_instances_to_remove.task_id
    AND task_instance.execution_date = task_instances_to_remove.execution_date
);

COMMIT;

Thank you for this @leonsmith ! Made our team's upgrade from v2.1.4 to v2.2.3 very clean and simple! 🚀 🚀 🚀 🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.2 Issues Reported for 2.2 area:upgrade Facilitating migration to a newer version of Airflow kind:documentation
Projects
None yet
Development

Successfully merging a pull request may close this issue.