Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BaseOperator.get_task_instances is broken due to order by TaskInstance.execution_date #20116

Closed
2 tasks done
PrincipalsOffice opened this issue Dec 7, 2021 · 5 comments
Closed
2 tasks done
Labels
affected_version:2.2 Issues Reported for 2.2 area:core kind:bug This is a clearly a bug

Comments

@PrincipalsOffice
Copy link

PrincipalsOffice commented Dec 7, 2021

Apache Airflow version

2.2.2 (latest released)

What happened

I was trying to call task.get_task_instances() but received an error sqlalchemy.exc.ArgumentError: SQL expression object expected, got object of type <class 'sqlalchemy.ext.associationproxy.ColumnAssociationProxyInstance'> instead.

Looking at the code, it's trying to order by TaskInstance.execution_date which is an association proxy. https://sourcegraph.com/github.com/apache/airflow@944dcfb/-/blob/airflow/models/baseoperator.py?L1252:21
If I remove the order_by clause, the function would work.

What you expected to happen

The function to return a list of TaskInstance objects

How to reproduce

Python 3.8.7 (default, Dec 22 2020, 16:05:35)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pendulum
>>> from airflow.operators.dummy import DummyOperator
>>> from airflow import DAG
>>> dag = DAG(
...     "test",
...     start_date=pendulum.parse("20211201"),
... )
>>> with dag:
...     task1 = DummyOperator(task_id="test_task")
...
>>> task = dag.get_task("test_task")
>>> task.get_task_instances(start_date="20211201", end_date="20211201")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/atl/venv_airflow_2.2.2/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/atl/venv_airflow_2.2.2/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1257, in get_task_instances
    session.query(TaskInstance)
  File "<string>", line 2, in order_by
  File "/home/atl/venv_airflow_2.2.2/lib/python3.8/site-packages/sqlalchemy/orm/base.py", line 227, in generate
    fn(self, *args[1:], **kw)
  File "/home/atl/venv_airflow_2.2.2/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 1932, in order_by
    criterion = self._adapt_col_list(criterion)
  File "/home/atl/venv_airflow_2.2.2/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 336, in _adapt_col_list
    return [
  File "/home/atl/venv_airflow_2.2.2/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 338, in <listcomp>
    expression._literal_as_label_reference(o), True, True
  File "/home/atl/venv_airflow_2.2.2/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 4678, in _literal_as_label_reference
    return _literal_as_text(element)
  File "/home/atl/venv_airflow_2.2.2/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 4721, in _literal_as_text
    return _literal_as(element, _no_text_coercion)
  File "/home/atl/venv_airflow_2.2.2/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 4711, in _literal_as
    raise exc.ArgumentError(
sqlalchemy.exc.ArgumentError: SQL expression object expected, got object of type <class 'sqlalchemy.ext.associationproxy.ColumnAssociationProxyInstance'> instead

Operating System

Debian GNU/Linux

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@PrincipalsOffice PrincipalsOffice added area:core kind:bug This is a clearly a bug labels Dec 7, 2021
@PrincipalsOffice PrincipalsOffice changed the title TaskInstance.execution_date still being referenced in queries TaskInstance.execution_date in order_by clause will break queries. Dec 7, 2021
@PrincipalsOffice PrincipalsOffice changed the title TaskInstance.execution_date in order_by clause will break queries. BaseOperator.get_task_instances is broken due to order by TaskInstance.execution_date Dec 7, 2021
@ashb
Copy link
Member

ashb commented Dec 7, 2021

This is meant to be a backwards compatibility shim, so that ti.execution_date on an instance, and TaskInstance.execution_date in a query both behave as expected.

Can you provide a reproduction case for this?

@PrincipalsOffice
Copy link
Author

@ashb I have updated the "How to reproduce" section with a reproducible snippet.

@uranusjr
Copy link
Member

uranusjr commented Dec 8, 2021

This is meant to be a backwards compatibility shim

Not really, the function has not been touched (except to apply Black) in three years, and is not used at all in the code base, and therefore alluded attention during the run_id migration.


Feel free to submit a pull request for this. The fix should join DagRun on TaskInstance on run_id, and order the result by DagRun.execution_date instead. You should be able to find some examples in the code base by searching for order_by(DagRun.execution_date). A test would be needed as well.

@ashb
Copy link
Member

ashb commented Dec 8, 2021

Oh, we updated DAG.get_task_instances, but forgot to update BaseOperator.get_task_instances as well

@eladkal eladkal added the affected_version:2.2 Issues Reported for 2.2 label Dec 24, 2021
@eladkal
Copy link
Contributor

eladkal commented Jul 7, 2022

Duplicate of #21656 issue was fixed in #21705

Noting that the reproduce example provided will not work even now.
task.get_task_instances(start_date="20211201", end_date="20211201") is wrong. The values can not be strings.

start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,

The DAG

import pendulum
from airflow.operators.dummy import DummyOperator
from airflow import DAG
dag = DAG(
     "20116",
     start_date=pendulum.parse("20211201"),
 )
with dag:
     task1 = DummyOperator(task_id="test_task")

task = dag.get_task("test_task")
task.get_task_instances(start_date="20211201", end_date="20211201")

Will yield:

Broken DAG: [/files/dags/20116.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/type_api.py", line 1487, in process
    return process_param(value, dialect)
  File "/opt/airflow/airflow/utils/sqlalchemy.py", line 68, in process_bind_param
    raise TypeError('expected datetime.datetime, not ' + repr(value))
TypeError: expected datetime.datetime, not '20211201'

You can change it to:
task.get_task_instances(start_date=pendulum.parse("20211201"), end_date=pendulum.parse("20211201"))

@eladkal eladkal closed this as not planned Won't fix, can't repro, duplicate, stale Jul 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.2 Issues Reported for 2.2 area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

4 participants