Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecations in airflow.models.taskreschedule #41808

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 0 additions & 67 deletions airflow/models/taskreschedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,18 @@

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING

from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, String, asc, desc, select, text
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import relationship

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models.base import COLLATION_ARGS, ID_LEN, TaskInstanceDependencies
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime

if TYPE_CHECKING:
import datetime

from sqlalchemy.orm import Query, Session
from sqlalchemy.sql import Select

from airflow.models.taskinstance import TaskInstance
Expand Down Expand Up @@ -133,66 +129,3 @@ def stmt_for_task_instance(
)
.order_by(desc(cls.id) if descending else asc(cls.id))
)

@staticmethod
@provide_session
def query_for_task_instance(
task_instance: TaskInstance,
descending: bool = False,
session: Session = NEW_SESSION,
try_number: int | None = None,
) -> Query:
"""
Return query for task reschedules for a given the task instance (deprecated).

:param session: the database session object
:param task_instance: the task instance to find task reschedules for
:param descending: If True then records are returned in descending order
:param try_number: Look for TaskReschedule of the given try_number. Default is None which
looks for the same try_number of the given task_instance.
"""
warnings.warn(
"Using this method is no longer advised, and it is expected to be removed in the future.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)

if try_number is None:
try_number = task_instance.try_number

TR = TaskReschedule
qry = session.query(TR).filter(
TR.dag_id == task_instance.dag_id,
TR.task_id == task_instance.task_id,
TR.run_id == task_instance.run_id,
TR.map_index == task_instance.map_index,
TR.try_number == try_number,
)
if descending:
return qry.order_by(desc(TR.id))
else:
return qry.order_by(asc(TR.id))

@staticmethod
@provide_session
def find_for_task_instance(
task_instance: TaskInstance,
session: Session = NEW_SESSION,
try_number: int | None = None,
) -> list[TaskReschedule]:
"""
Return all task reschedules for the task instance and try number, in ascending order.

:param session: the database session object
:param task_instance: the task instance to find task reschedules for
:param try_number: Look for TaskReschedule of the given try_number. Default is None which
looks for the same try_number of the given task_instance.
"""
warnings.warn(
"Using this method is no longer advised, and it is expected to be removed in the future.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
return session.scalars(
TaskReschedule.stmt_for_task_instance(ti=task_instance, try_number=try_number, descending=False)
).all()
8 changes: 8 additions & 0 deletions newsfragments/41808.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Removed deprecations in ``airflow.models.taskreschedule``.
jscheffl marked this conversation as resolved.
Show resolved Hide resolved

Removed methods:

- ``query_for_task_instance()``
- ``find_for_task_instance()``

Note: there are no replacements, if data is needed, you need to query via sqlalchemy.