Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import psutil
import sqlalchemy.exc
from celery import maybe_patch_concurrency # type: ignore[attr-defined]
from celery import maybe_patch_concurrency
from celery.app.defaults import DEFAULT_TASK_LOG_FMT
from celery.signals import after_setup_logger
from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def send_task_to_executor(

# The type is right for the version, but the type cannot be defined correctly for Airflow 2 and 3
# concurrently;
return key, args, result # type: ignore[return-value]
return key, args, result


def fetch_celery_task_state(async_result: AsyncResult) -> tuple[str, str | ExceptionWithTraceback, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ def _task_event_logs(self, value):
def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
"""Return queued tasks from celery and kubernetes executor."""
queued_tasks = self.celery_executor.queued_tasks.copy()
queued_tasks.update(self.kubernetes_executor.queued_tasks) # type: ignore[arg-type]
queued_tasks.update(self.kubernetes_executor.queued_tasks)

return queued_tasks # type: ignore[return-value]
return queued_tasks

@queued_tasks.setter
def queued_tasks(self, value) -> None:
Expand Down