Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqlalchemy error when running CLI command airflow tasks test #34109

Closed
1 of 2 tasks
mbarugelCA opened this issue Sep 5, 2023 · 13 comments · Fixed by #34120
Closed
1 of 2 tasks

sqlalchemy error when running CLI command airflow tasks test #34109

mbarugelCA opened this issue Sep 5, 2023 · 13 comments · Fixed by #34120
Assignees
Labels
affected_version:main_branch Issues Reported for main branch area:CLI good first issue kind:bug This is a clearly a bug

Comments

@mbarugelCA
Copy link

Apache Airflow version

2.7.0

What happened

Calling airflow tasks test <dag_id> <task_id> runs the task correctly, but yields the following error after it's done (regardless of failure/success of the task):

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.10/site-packages/airflow/__main__.py", line 60, in main
    args.func(args)
  File "/usr/local/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/cli.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/task_command.py", line 633, in task_test
    with create_session() as session:
  File "/usr/local/lib/python3.10/contextlib.py", line 142, in __exit__
    next(self.gen)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 37, in create_session
    session.commit()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
    self._transaction.commit(_to_root=self.future)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 832, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 667, in execute
    util.preloaded.orm_persistence.delete_obj(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 330, in delete_obj
    table_to_mapper = base_mapper._sorted_tables
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 1184, in __get__
    obj.__dict__[self.__name__] = result = self.fget(obj)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/mapper.py", line 3386, in _sorted_tables
    sorted_ = sql_util.sort_tables(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/ddl.py", line 1217, in sort_tables
    for (t, fkcs) in sort_tables_and_constraints(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/ddl.py", line 1289, in sort_tables_and_constraints
    filtered = filter_fn(fkc)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/ddl.py", line 1207, in _skip_fn
    if skip_fn(fk):
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/mapper.py", line 3369, in skip
    dep = table_to_mapper.get(fk.column.table)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 1113, in __get__
    obj.__dict__[self.__name__] = result = self.fget(obj)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/schema.py", line 2532, in column
    return self._resolve_column()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/schema.py", line 2543, in _resolve_column
    raise exc.NoReferencedTableError(
sqlalchemy.exc.NoReferencedTableError: Foreign key associated with column 'dag_run_note.user_id' could not find table 'ab_user' with which to generate a foreign key to target column 'id'

What you think should happen instead

This error should never happen. It wasn't happening with version 2.5.3, which I was running before.

How to reproduce

Create this DAG file:

from datetime import datetime
from airflow import DAG
from airflow.decorators import task

default_args = {
    "depends_on_past": False,
    "start_date": datetime(2022, 3, 9, 0, 0),
}

with DAG(
    dag_id="my_dag",
    default_args=default_args,
    catchup=False,
    schedule=None,
    max_active_runs=1,
    ) as dag:

    @task
    def my_task():
        print("Done")

    my_task()

Then run airflow tasks test my_dag my_task

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.5.1
apache-airflow-providers-celery==3.3.2
apache-airflow-providers-common-sql==1.7.0
apache-airflow-providers-ftp==3.5.0
apache-airflow-providers-google==10.6.0
apache-airflow-providers-http==4.5.0
apache-airflow-providers-imap==3.3.0
apache-airflow-providers-jdbc==4.0.1
apache-airflow-providers-mysql==5.2.1
apache-airflow-providers-postgres==5.6.0
apache-airflow-providers-sftp==4.5.0
apache-airflow-providers-slack==7.3.2
apache-airflow-providers-snowflake==4.4.2
apache-airflow-providers-sqlite==3.4.3
apache-airflow-providers-ssh==3.7.1

Deployment

Docker-Compose

Deployment details

No response

Anything else

This error always happens if you run airflow tasks test my_dag my_task

If you pass an execution date, it will yield the error the first time, but then it won't error out again:

airflow tasks test my_dag my_task 2023-09-01  # error
airflow tasks test my_dag my_task 2023-09-01  # second time: no error
airflow tasks test my_dag my_task 2023-09-01  # 3rd time: no error
airflow tasks test my_dag my_task 2023-09-02  # changed datetime: error

I've seen this error both with Postgres and sqlite as backend.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@mbarugelCA mbarugelCA added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Sep 5, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 5, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@hussein-awala hussein-awala added area:CLI affected_version:main_branch Issues Reported for main branch and removed area:core needs-triage label for new issues that we didn't triage yet labels Sep 5, 2023
@hussein-awala hussein-awala self-assigned this Sep 5, 2023
@freeduck
Copy link

freeduck commented Sep 6, 2023

I am experiencing the same issue

@potiuk
Copy link
Member

potiuk commented Sep 6, 2023

I am experiencing the same issue

There is very little value in stating it @freeduck . But I think there is huge value if you check if the fix proposed in the linked PR fixes it. Can you please apply the fix in #34120 to your installation and verify if it solves it ? That would help a lot in finding out if the fix is good.

Can we count on your help with that @freeduck ?

@kuikeelc
Copy link

The issue seems to still exist in v2.7.2

@hussein-awala
Copy link
Member

The issue seems to still exist in v2.7.2

Yes, we already detected it in the RC (#34830 (comment)).

@kuikeelc
Copy link

kuikeelc commented Nov 7, 2023

The issue seems to still exist in v2.7.3.

@Kache
Copy link
Contributor

Kache commented Nov 13, 2023

I believe I have a fix as well as a workaround. Create the following as the plugin file, plugins/workaround_dag_run_note_model_init.py:

"""
Workaround for: https://github.com/apache/airflow/issues/34109

DagRunNote has a foreign key `user_id` (and thus a dependency) to User, but it
seems `airflow.models.dagrun` gets loaded first (at least when running `airflow
tasks test DAG_ID TASK_ID`). Loading User first seems to solve the issue.
"""

from airflow.auth.managers.fab.models import User  # noqa

# UPDATE: don't import the following
# it'll prematurely import airflow.configuration, triggering additional import side-effects
# from airflow.models.dagrun import DagRunNote  # noqa

The equivalent fix would be to encode the dependency into the foreign key definition, establishing proper import ordering.

from airflow.auth.managers.fab.models import User

class DagRunNote(Base):
    user_id = Column(
        Integer,
        ForeignKey(User.id, name="dag_run_note_user_fkey"),  # reference User.id instead of using "ab_user.id"
        nullable=True,
    )

I can create a PR. I'm not familiar with sqlalchemy though, scrutiny is welcome

Kache added a commit to Kache/airflow that referenced this issue Nov 13, 2023
It seems DagRun gets loaded before User, resulting in a
sqlalchemy.exc.NoReferencedTableError (at least when running `airflow
tasks test DAG_ID TASK_ID`) when defining DagRun's foreign key to the
still nonexistent User table.

Referencing the column object instead of using str establishes proper
import ordering and fixes the issue.

Fix: apache#34109
@Kache
Copy link
Contributor

Kache commented Nov 14, 2023

In #35614, it was discussed that there is already a feature coming down the line that'll have a proper fix.

In the short term, the maintainers would accept a PR that removes the dag_run_note_user_fkey and task_instance_note_user_fkey foreign keys. Unfortunately, I don't have the time to figure out how to create the migration to do that at this time.

In the immediate term, the workaround should suffice.

@brki
Copy link
Contributor

brki commented Aug 14, 2024

I don't see this error anymore on 2.9.3 (using an image based on the airflow:slim-2.9.3-python3.12 docker image).

@potiuk potiuk closed this as completed Aug 14, 2024
@potiuk
Copy link
Member

potiuk commented Aug 14, 2024

Closing then.

@phyk
Copy link

phyk commented Aug 21, 2024

This issue is still present. I use airflow 2.9.3, and try to run airflow using a sqlite database for the dag.test() command. The database gets created correctly using airflow db init/migrate, but when running a dag twice, when trying to delete the previous dag run, the error is present again.

Traceback (most recent call last):
  File "/airflow/dags/dag.py", line 313, in <module>
    dag.test(
  File "/var/lib/python3.12/site-packages/airflow/utils/session.py", line 79, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/lib/python3.12/site-packages/airflow/models/dag.py", line 2890, in test
    dr: DagRun = _get_or_create_dagrun(
                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/var/lib/python3.12/site-packages/airflow/models/dag.py", line 4205, in _get_or_create_dagrun
    session.commit()
  File "/var/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
    self._transaction.commit(_to_root=self.future)
  File "/var/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 832, in commit
    self._prepare_impl()
  File "/var/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl
    self.session.flush()
  File "/var/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/var/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/var/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/var/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/var/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/var/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/var/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 667, in execute
    util.preloaded.orm_persistence.delete_obj(
  File "/var/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 330, in delete_obj
    table_to_mapper = base_mapper._sorted_tables
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 1184, in __get__
    obj.__dict__[self.__name__] = result = self.fget(obj)
                                           ^^^^^^^^^^^^^^
  File "var/lib/python3.12/site-packages/sqlalchemy/orm/mapper.py", line 3386, in _sorted_tables
    sorted_ = sql_util.sort_tables(
              ^^^^^^^^^^^^^^^^^^^^^
  File "/var/lib/python3.12/site-packages/sqlalchemy/sql/ddl.py", line 1217, in sort_tables
    for (t, fkcs) in sort_tables_and_constraints(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/lib/python3.12/site-packages/sqlalchemy/sql/ddl.py", line 1289, in sort_tables_and_constraints
    filtered = filter_fn(fkc)
               ^^^^^^^^^^^^^^
  File "/var/lib/python3.12/site-packages/sqlalchemy/sql/ddl.py", line 1207, in _skip_fn
    if skip_fn(fk):
       ^^^^^^^^^^^
  File "/var/lib/python3.12/site-packages/sqlalchemy/orm/mapper.py", line 3369, in skip
    dep = table_to_mapper.get(fk.column.table)
                              ^^^^^^^^^
  File "/var/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 1113, in __get__
    obj.__dict__[self.__name__] = result = self.fget(obj)
                                           ^^^^^^^^^^^^^^
  File "/var/lib/python3.12/site-packages/sqlalchemy/sql/schema.py", line 2532, in column
    return self._resolve_column()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/var/lib/python3.12/site-packages/sqlalchemy/sql/schema.py", line 2543, in _resolve_column
    raise exc.NoReferencedTableError(
sqlalchemy.exc.NoReferencedTableError: Foreign key associated with column 'dag_run_note.user_id' could not find table 'ab_user' with which to generate a foreign key to target column 'id'

@brki
Copy link
Contributor

brki commented Aug 21, 2024

@phyk That's interesting that you have the issue. Perhaps it's DB specific?

I was having the issue with Postgresql on Airflow 2.8.3, but after upgrading to 2.9.3 don't have it anymore. I used the sample DAG in the "how to reproduce" it of this issue.

@Kache
Copy link
Contributor

Kache commented Aug 24, 2024

FYI updated my posted workaround by removing the second import because it causes premature importing of many more airflow modules like airflow.configuration, which can be problematic due to all the import side-effects

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:main_branch Issues Reported for main branch area:CLI good first issue kind:bug This is a clearly a bug
Projects
None yet
8 participants