Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler executes tasks for temporary DagRuns created by the airflow tasks test command #35994

Open
1 of 2 tasks
smphhh opened this issue Dec 1, 2023 · 2 comments
Open
1 of 2 tasks
Labels
area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug Stale Bug Report

Comments

@smphhh
Copy link

smphhh commented Dec 1, 2023

Apache Airflow version

2.7.3

Also tested on 2.8.0b1

What happened

When a temporary DagRun is created by the airflow tasks test command the scheduler will start executing all tasks for that DagRun, including the one also ran by the test command.

What you think should happen instead

Scheduler shouldn't launch tasks for such temporary DagRuns as this is totally unexpected for the user.

How to reproduce

Use the airflow tasks test command to launch a test run of a task using an execution date without an existing DagRun. The command will then create a new temporary DagRun in the database, and if the Dag is active the scheduler will start launching tasks for that run.

Note that this is more evident when testing a task that takes a while to complete as the temporary DagRun is deleted after the task completes, which will cause the task runs launched by the scheduler to stop.

Operating System

Linux 94b223524983 6.1.32-0-virt #1-Alpine SMP PREEMPT_DYNAMIC Mon, 05 Jun 2023 09:39:09 +0000 x86_64 x86_64 x86_64 GNU/Linux

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else

I think one possible solution would be to create an new DagRun type for the temporary runs. The scheduler wouldn't launch tasks for such runs as is the case with backfill runs, but the temporary runs wouldn't affect the timing of new runs unlike I believe is the case with backfill runs.

Also I think it would probably be clearer if the temporary runs wouldn't be shown in the dag grid view.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@smphhh smphhh added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Dec 1, 2023
@smphhh smphhh changed the title Scheduler executes tasks for temporary DagRuns created by airflow tasks test Scheduler executes tasks for temporary DagRuns created by the airflow tasks test command Dec 1, 2023
@brki
Copy link
Contributor

brki commented Dec 1, 2023

Edit: never mind this comment, I now see #34109 reports what I mention here.

I also see that behaviour on 2.7.3.

I don't know if it's related, but now whenever I test a task from the cli, regardless of whether or not the task finishes successfully, I see this at the end of the output:

sqlalchemy.exc.NoReferencedTableError: Foreign key associated with column 'dag_run_note.user_id' could not find table 'ab_user' with which to generate a foreign key to target column 'id'
Click to see complete exception details

[2023-12-01T13:19:30.791+0000] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=sandbox_test, task_id=recreate_workdir, execution_date=20231201T131930, start_date=, end_date=20231201T131930
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py", line 57, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 114, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 634, in task_test
    with create_session() as session:
  File "/usr/local/lib/python3.11/contextlib.py", line 144, in __exit__
    next(self.gen)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 39, in create_session
    session.commit()
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
    self._transaction.commit(_to_root=self.future)
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 832, in commit
    self._prepare_impl()
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl
    self.session.flush()
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 667, in execute
    util.preloaded.orm_persistence.delete_obj(
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 330, in delete_obj
    table_to_mapper = base_mapper._sorted_tables
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 1184, in __get__
    obj.__dict__[self.__name__] = result = self.fget(obj)
                                           ^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/mapper.py", line 3386, in _sorted_tables
    sorted_ = sql_util.sort_tables(
              ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 1217, in sort_tables
    for (t, fkcs) in sort_tables_and_constraints(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 1289, in sort_tables_and_constraints
    filtered = filter_fn(fkc)
               ^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py", line 1207, in _skip_fn
    if skip_fn(fk):
       ^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/mapper.py", line 3369, in skip
    dep = table_to_mapper.get(fk.column.table)
                              ^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 1113, in __get__
    obj.__dict__[self.__name__] = result = self.fget(obj)
                                           ^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/schema.py", line 2532, in column
    return self._resolve_column()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/schema.py", line 2543, in _resolve_column
    raise exc.NoReferencedTableError(
sqlalchemy.exc.NoReferencedTableError: Foreign key associated with column 'dag_run_note.user_id' could not find table 'ab_user' with which to generate a foreign key to target column 'id'

@RNHTTR RNHTTR added area:Scheduler including HA (high availability) scheduler and removed area:core needs-triage label for new issues that we didn't triage yet labels Dec 29, 2023
Copy link

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug Stale Bug Report
Projects
None yet
Development

No branches or pull requests

3 participants