Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def _process_workloads(self, workloads: Sequence[workloads.All]) -> None:
executor_config = w.ti.executor_config or {}

del self.queued_tasks[key]
self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config) # type: ignore[arg-type]
self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)
self.running.add(key)

def sync(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ def execute(self, context: Context):
def execute_deferrable(self):
self.defer(
trigger=KubernetesJobTrigger(
job_name=self.job.metadata.name, # type: ignore[union-attr]
job_namespace=self.job.metadata.namespace, # type: ignore[union-attr]
pod_name=self.pod.metadata.name, # type: ignore[union-attr]
pod_namespace=self.pod.metadata.namespace, # type: ignore[union-attr]
job_name=self.job.metadata.name,
job_namespace=self.job.metadata.namespace,
pod_name=self.pod.metadata.name,
pod_namespace=self.pod.metadata.namespace,
base_container_name=self.base_container_name,
kubernetes_conn_id=self.kubernetes_conn_id,
cluster_context=self.cluster_context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
)
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils import xcom_sidecar # type: ignore[attr-defined]
from airflow.providers.cncf.kubernetes.utils import xcom_sidecar
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
EMPTY_XCOM_RESULT,
OnFinishAction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Get current job status and yield a TriggerEvent."""
if self.get_logs or self.do_xcom_push:
pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Get current pod status and yield a TriggerEvent."""
self.log.info("Checking pod %r in namespace %r.", self.pod_name, self.pod_namespace)
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
from airflow.sdk import BaseHook, BaseOperator
else:
from airflow.hooks.base import BaseHook # type: ignore[attr-defined,no-redef]
from airflow.models import BaseOperator # type: ignore[no-redef]
from airflow.utils.xcom import XCOM_RETURN_KEY # type: ignore[no-redef]
from airflow.models import BaseOperator
from airflow.utils.xcom import XCOM_RETURN_KEY

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseSensorOperator
Expand Down