Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-44 Migrate Job to Internal API #34026

Merged
merged 3 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from flask import Response

from airflow.jobs.job import Job, most_recent_job
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.utils.session import create_session

Expand Down Expand Up @@ -51,6 +52,12 @@ def _initialize_map() -> dict[str, Callable]:
DagModel.get_current,
DagFileProcessorManager.clear_nonexistent_import_errors,
DagWarning.purge_inactive_dag_warnings,
Job._add_to_db,
Job._fetch_from_db,
Job._kill,
Job._update_heartbeat,
Job._update_in_db,
most_recent_job,
MetastoreBackend._fetch_connection,
MetastoreBackend._fetch_variable,
XCom.get_value,
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from airflow.models.taskinstance import TaskInstanceKey


class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
class BackfillJobRunner(BaseJobRunner, LoggingMixin):
"""
A backfill job runner consists of a dag or subdag for a specific time range.

Expand Down
12 changes: 5 additions & 7 deletions airflow/jobs/base_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Generic, TypeVar
from typing import TYPE_CHECKING

from airflow.utils.session import NEW_SESSION, provide_session

Expand All @@ -27,22 +27,20 @@
from airflow.jobs.job import Job
from airflow.serialization.pydantic.job import JobPydantic

J = TypeVar("J", "Job", "JobPydantic", "Job | JobPydantic")


class BaseJobRunner(Generic[J]):
class BaseJobRunner:
"""Abstract class for job runners to derive from."""

job_type = "undefined"

def __init__(self, job: J) -> None:
def __init__(self, job: Job) -> None:
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
job.job_type = self.job_type
self.job: J = job
self.job: Job = job

def _execute(self) -> int | None:
"""
Expand All @@ -65,7 +63,7 @@ def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:

@classmethod
@provide_session
def most_recent_job(cls, session: Session = NEW_SESSION) -> Job | None:
def most_recent_job(cls, session: Session = NEW_SESSION) -> Job | JobPydantic | None:
"""Return the most recent job of this type, if any, based on last heartbeat received."""
from airflow.jobs.job import most_recent_job

Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/dag_processor_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def empty_callback(_: Any) -> None:
pass


class DagProcessorJobRunner(BaseJobRunner[Job], LoggingMixin):
class DagProcessorJobRunner(BaseJobRunner, LoggingMixin):
"""
DagProcessorJobRunner is a job runner that runs a DagFileProcessorManager processor.

Expand Down
Loading