Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-44 Migrate Job to Internal API #34026

Merged
merged 3 commits into from
Sep 22, 2023
Merged

AIP-44 Migrate Job to Internal API #34026

merged 3 commits into from
Sep 22, 2023

Conversation

mhenc
Copy link
Contributor

@mhenc mhenc commented Sep 1, 2023

This PR migrates Job's methods to Internal API.
It also adds some methods for JobPydantic to make it usable interchangebly with Job.
However not all method are migrated - *JobRunner still operates on ORM Job object - and it is required, as Job contains some "callback" methods, which can't be easily serialized and executed in Internal API. Instead they may simply do additional Internal API call.
JobPydantic is mostly used as a transport object.

closes: #29315
related: #30298


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:Scheduler including HA (high availability) scheduler area:serialization area:Triggerer labels Sep 1, 2023
@mhenc mhenc force-pushed the job_aip44 branch 3 times, most recently from 6d9ee1e to e21782f Compare September 1, 2023 21:52
@mhenc mhenc force-pushed the job_aip44 branch 2 times, most recently from a23c54a to bb078f1 Compare September 1, 2023 22:20
@mhenc mhenc marked this pull request as ready for review September 1, 2023 22:21
@mhenc mhenc force-pushed the job_aip44 branch 2 times, most recently from 4186480 to 0f6ca78 Compare September 4, 2023 13:42
session.merge(job)
session.commit()

Job._kill(job_id=self.id, session=session)
Copy link
Member

@potiuk potiuk Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slight change in the semantics - job end_date is updated after the on_kill, but looking at implementations, that does not matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but please notice that end_date was not set on self but on newly fetched object - so self.on_kill() didn't have access to the job.end_date
That's why I believe it's safe to move the updating for end_date after.

@@ -190,11 +182,10 @@ def heartbeat(

try:
# This will cause it to load from the db
session.merge(self)
self._merge_from(Job._fetch_from_db(self, session))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tihnk we need to meet for an interactive brainstorming session and discuss what's going on here. I believe there were some non-obvious behaviours of heartbeat that we should replicate (especially around edge cases).

I tried recently to understand what's going on in this method and explain it here: #33689 (reply in thread)

And I think it's not entirely obvious if we have the same now.

Let's discuss on slack @mhenc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk any update on this?

@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

from datetime import datetime
Copy link
Member

@uranusjr uranusjr Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have an effort consolidating this to import datetime instead. And there’s another one to move this kind of imports into the if TYPE_CHECKING block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Done.
Unfortunately I can't move it under TYPE_CHECKING in pydantic/job.py as Pydantic doesn't seem to like it.

airflow/jobs/job.py Outdated Show resolved Hide resolved
@potiuk
Copy link
Member

potiuk commented Sep 22, 2023

LGTM after deeper review.

@potiuk potiuk merged commit 7a5b6a3 into apache:main Sep 22, 2023
43 checks passed
@potiuk
Copy link
Member

potiuk commented Sep 22, 2023

@mhenc -> one thing to remember that we have to do as a follow-up for AIP-44 as well - we need to implement the heartbeat callback in AIP-44 compliant way.

In the current implementaiton of AIP-44 the callback is still called on the client side (i.e. in the local task job) and it performs some DB methods / refreshing task instance and dag run from the DB so it is . But it also uses some "local" process manageement (terminating running task if the state changed, but also handle the "impersonation" case where we have additional parent process created in the process of switching to the impersonated user - so this method should be likely split into "retrieving state from DB" (with internal API call) and reacting to state change.

Or maybe we could refactor the heartbeat_callback approach and make a "dedicated" local task job heartbeat that will do it in a single Internal API call to both update heartbeat status AND retrieve the state and return it to be able to kill processes as reaction to external DB state change.

cc: @bjankie1 - I think it would be great if you two think about it and propose some approach that would be good regarding potential optimisation of heartbeat in the future).

Currently the callback is as follows.

    @provide_session
    def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
        """Self destruct task if state has been moved away from running externally."""
        if self.terminating:
            # ensure termination if processes are created later
            self.task_runner.terminate()
            return

        self.task_instance.refresh_from_db()
        ti = self.task_instance

        if ti.state == TaskInstanceState.RUNNING:
            fqdn = get_hostname()
            same_hostname = fqdn == ti.hostname
            if not same_hostname:
                self.log.error(
                    "The recorded hostname %s does not match this instance's hostname %s",
                    ti.hostname,
                    fqdn,
                )
                raise AirflowException("Hostname of job runner does not match")
            current_pid = self.task_runner.get_process_pid()
            recorded_pid = ti.pid
            same_process = recorded_pid == current_pid

            if recorded_pid is not None and (ti.run_as_user or self.task_runner.run_as_user):
                # when running as another user, compare the task runner pid to the parent of
                # the recorded pid because user delegation becomes an extra process level.
                # However, if recorded_pid is None, pass that through as it signals the task
                # runner process has already completed and been cleared out. `psutil.Process`
                # uses the current process if the parameter is None, which is not what is intended
                # for comparison.
                recorded_pid = psutil.Process(ti.pid).ppid()
                same_process = recorded_pid == current_pid

            if recorded_pid is not None and not same_process and not IS_WINDOWS:
                self.log.warning(
                    "Recorded pid %s does not match the current pid %s", recorded_pid, current_pid
                )
                raise AirflowException("PID of job runner does not match")
        elif self.task_runner.return_code() is None and hasattr(self.task_runner, "process"):
            if ti.state == TaskInstanceState.SKIPPED:
                # A DagRun timeout will cause tasks to be externally marked as skipped.
                dagrun = ti.get_dagrun(session=session)
                execution_time = (dagrun.end_date or timezone.utcnow()) - dagrun.start_date
                if ti.task.dag is not None:
                    dagrun_timeout = ti.task.dag.dagrun_timeout
                else:
                    dagrun_timeout = None
                if dagrun_timeout and execution_time > dagrun_timeout:
                    self.log.warning("DagRun timed out after %s.", execution_time)

            # potential race condition, the _run_raw_task commits `success` or other state
            # but task_runner does not exit right away due to slow process shutdown or any other reasons
            # let's do a throttle here, if the above case is true, the handle_task_exit will handle it
            if self._state_change_checks >= 1:  # defer to next round of heartbeat
                self.log.warning(
                    "State of this instance has been externally set to %s. Terminating instance.", ti.state
                )
                self.terminating = True
            self._state_change_checks += 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-44 Airflow Internal API area:Scheduler including HA (high availability) scheduler area:serialization area:Triggerer changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AIP-44 Migrate BaseJob.heartbeat to InternalAPI
4 participants