Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for abort notification #1172

Merged
merged 1 commit into from
Sep 6, 2024

Conversation

onlyann
Copy link
Contributor

@onlyann onlyann commented Aug 25, 2024

Successful PR Checklist:

  • Tests
    • (not applicable?)
  • Documentation
    • (not applicable?)

PR label(s):

Context

So far, the only use of LISTEN/NOTIFY has been to let the worker know that a new job is ready to be processed.

This extends listen_notify to accept different types of notification through using the payload:

  • job_inserted: the existing notification
  • abort_job_requested: when a running job is requested to be aborted.

Instead of making explicit calls to the database every time a job calls should_abort, the worker uses the same mechanism than for detecting new jobs, a combination of Postgres notification and polling.

Other changes

The edge case of retrying a job to be aborted is handled at the database level instead of the worker.

Next steps

Once this change lands in v3, it will help with cancellation during the shutdown event and #1084

@github-actions github-actions bot added PR type: breaking 💥 Contains breaking changes PR type: feature ⭐️ Contains new features labels Aug 25, 2024
Copy link

github-actions bot commented Aug 25, 2024

Coverage report

Click to see where and how coverage changed

FileStatementsMissingCoverageCoverage
(new stmts)
Lines missing
  procrastinate
  connector.py
  job_context.py
  jobs.py
  manager.py
  psycopg_connector.py
  testing.py
  utils.py
  worker.py 363-364
  procrastinate/contrib/aiopg
  aiopg_connector.py
Project Total  

This report was generated by python-coverage-comment-action

@medihack
Copy link
Member

Excellent. I will have a look at it tomorrow 🙂. I think it's a good idea to handle more stuff at the database level (as long as we can still test it).

@onlyann
Copy link
Contributor Author

onlyann commented Aug 26, 2024

Excellent. I will have a look at it tomorrow 🙂. I think it's a good idea to handle more stuff at the database level (as long as we can still test it).

It also removes some race condition with the previous approach.

Glad you had an acceptance test written against it already or I will have surely missed it.

@medihack
Copy link
Member

Excellent. I will have a look at it tomorrow 🙂. I think it's a good idea to handle more stuff at the database level (as long as we can still test it).

It also removes some race condition with the previous approach.

Glad you had an acceptance test written against it already or I will have surely missed it.

Cool, let me know when it's ready for review (still a draft currently and some connector excluded from tests/acceptance/test_async.py). But looks pretty good at first glance.

@onlyann onlyann marked this pull request as ready for review August 28, 2024 08:23
@onlyann onlyann requested a review from a team as a code owner August 28, 2024 08:23
database and might flood the database. Ensure you do it only sometimes and
not from too many parallel tasks.
:::
The worker receives a Postgres notification every time a running job is requested to abort, unless `listen_notify=False`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding something like "Internally, the worker receives ..."? It would make it clearer that this is nothing the user has to handle but something that works behind the curtain.

We should also update the docstrings of listen_notify and polling_interval that this is not just for requesting jobs.

So, should_abort() is now used for async and sync tasks. Correct? Haven't you originally planned to cancel the async tasks automatically on abort? (The thing I was a bit ambivalent about). Just wondering.

Copy link
Contributor Author

@onlyann onlyann Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, should_abort() is now used for async and sync tasks. Correct?

Correct

Haven't you originally planned to cancel the async tasks automatically on abort?

Yes. I aim to keep this PR focused on notification support and open another about issuing an asyncio cancel on async tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I amended the documentation.
I also noticed I need to rewrite some of the discussion section around the worker. I will however leave that part for another PR as it is not related to abort notification.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but maybe we can have updated docstrings for listen_notify and polling_interval.

Currently, it says for polling_interval:

polling_interval : ``float``
    Indicates the maximum duration (in seconds) the worker waits between
    each database job poll. Raising this parameter can lower the rate at which
    the worker makes queries to the database for requesting jobs.
    (defaults to 5.0)

How about:

polling_interval : ``float``
    Indicates the maximum duration (in seconds) the worker waits between
    each database job poll. Raising this parameter can lower the rate at which
    the worker makes queries to the database for requesting jobs or checking
    for job abortion requests.
    (defaults to 5.0)

And for listen_notify it says:

listen_notify : ``bool``
    If ``True``, the worker will dedicate a connection from the pool to
    listening to database events, notifying of newly available jobs.
    If ``False``, the worker will just poll the database periodically
    (see ``polling_interval``). (defaults to ``True``)

How about:

listen_notify : ``bool``
    If ``True``, the worker will dedicate a connection from the pool to
    listening to database events, notifying of newly available jobs, or
    job abortion requests.
    If ``False``, the worker will just poll the database periodically
    (see ``polling_interval``). (defaults to ``True``)

Copy link
Member

@ewjoachim ewjoachim Aug 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(while we're at it,

If ``False``, the worker will just poll the database periodically

might be misleading in implying that this doesn't happen with True
Maybe:

If False, the worker won't listen to database events, so it will only
know about new jobs & abortion requests through the polling mechanism

?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushing some update on this from your feedbacks.
Let me know what you think

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the bullet point Fetching updates for running jobs mean? Does the polling more than checking for new jobs and abortion requests?

procrastinate/worker.py Outdated Show resolved Hide resolved
@onlyann onlyann requested a review from medihack August 31, 2024 12:25
Copy link
Member

@ewjoachim ewjoachim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that was awesome :) Still plenty of comments, but definitly going the right way. Thank you!

procrastinate/job_context.py Outdated Show resolved Hide resolved
Comment on lines -87 to -80
async def should_abort_async(self) -> bool:
assert self.job.id
job_id = self.job.id
return await self.app.job_manager.get_job_abort_requested_async(job_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense that should_abort is able to trigger something, especially if listen_notify is False. Consequently, I think it makes sense if we keep this function, even if it currently only returns static info that doesn't need async. I'd rather the async users keep using the async method rather than have them change to the sync method, to later re-change to the async method if we change something.

(EDIT: well, the polling_interval actually handles the listen_notify=False scenario but... I still think it's worth using a dedicated async func for now. Probably worth a comment too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about extending the Job class instead to have abort_requested. This way, the existing list_job operation covers it and there is no need to have distinct operations for every piece of state?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to make sense. I agree!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I might not have understood your proposal.

I think it's worth keeping a should_abort_async on the context just because we advertised it before because it's really not going to get in the way of maintenance, independently from how we can make the info available on the job. It's literally:

async def should_abort_async(self):
    # As it stands, `should_abort` doesn't do I/O as of now.
    return self.should_abort()

I don't see a case where this get in our way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given how short lived that function is, how simple it is to migrate to the non async version and the opportunity to have breaking changes as part of v3, it seems reasonable to remove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, won't fight you on this.

procrastinate/jobs.py Outdated Show resolved Hide resolved
@@ -12,6 +12,12 @@
QUEUEING_LOCK_CONSTRAINT = "procrastinate_jobs_queueing_lock_idx"


class Notify(Protocol):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You define 2 class Notify(Protocol): (the other one is in connector.py. Is this intended ? It looks like a remnant of refactoring.

If it's intended, they should not be named the same, and potentially be defined next to each other so it's easier to tell the difference (though circular imports will be annoying)

Also, I believe that a more descriptive name than Notify could help understand the code better. OnNotificationCallback ?

Copy link
Contributor Author

@onlyann onlyann Aug 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not exactly the same.
The one in connector has payload: string because the connector doesn't concern itself with how to parse the payload while the manager handles parsing the payload to expose a typed notification.

Copy link
Contributor Author

@onlyann onlyann Aug 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the name of the manager one to NotificationCallback.
FYI, Notify is the name used by psycopg

priority = COALESCE(new_priority, priority),
queue_name = COALESCE(new_queue_name, queue_name),
lock = COALESCE(new_lock, lock)
SET status = CASE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads like it would make sense to turn the "CASE" around:

CASE abort_requested
    WHEN TRUE THEN
        UPDATE procrastinate_jobs SET ...
    WHEN FALSE THEN
        UPDATE procrastinate_jobs SET ...
END

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not following the example.
Where does abort_requested come from when it is outside the UPDATE statement?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm you're right. Though maybe doing a quick SELECT to get the abort_requested might be enough. It's not very important but the current code is quite hard to read and I believe will be harder to maintain.

Copy link
Contributor Author

@onlyann onlyann Sep 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is much better to have something like below. I am happy to change if you feel it is more maintainable, or maybe this could be simplified even further?

CREATE OR REPLACE FUNCTION procrastinate_retry_job(
    job_id bigint,
    retry_at timestamp with time zone,
    new_priority integer,
    new_queue_name varchar,
    new_lock varchar
)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
    _job_id bigint;
    _abort_requested boolean;
BEGIN
    -- First, check if the job exists and get its abort_requested status
    SELECT id, abort_requested 
    INTO _job_id, _abort_requested
    FROM procrastinate_jobs
    WHERE id = job_id AND status = 'doing';

    IF _job_id IS NULL THEN
        RAISE EXCEPTION 'Job was not found or not in "doing" status (job id: %)', job_id;
    END IF;

    -- Update the job based on abort_requested status
    UPDATE procrastinate_jobs
    SET
        (status, attempts, scheduled_at, priority, queue_name, lock) =
        CASE
            WHEN NOT _abort_requested THEN
                ('todo'::procrastinate_job_status,
                 attempts + 1,
                 retry_at,
                 COALESCE(new_priority, priority),
                 COALESCE(new_queue_name, queue_name),
                 COALESCE(new_lock, lock))
            ELSE
                ('failed'::procrastinate_job_status,
                 attempts,
                 scheduled_at,
                 priority,
                 queue_name,
                 lock)
        END
    WHERE id = job_id;
END;
$$;

Comment on lines 368 to 372
running_job_ids = {
c.job.id for c in self._running_jobs.values() if c.job.id
}

self._job_ids_to_abort.update(running_job_ids.intersection(job_ids))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very slightly worried that there are 2 independent places where we react to changes in the abort job list. I think I'd feel slightly better if there was a single function (well, coroutine) handling this.

That said, while the work we do on self._job_ids_to_abort is seemingly identical (replace a set vs update a set), it will be different when we actually abort the tasks, because we'll need to identify what jobs should be aborted which we haven't aborted already.

So all in all... Maybe we'll see how to refactor when we add the abortion itself. But I'd prefer if we don't have 2 very related pieces of code that need to do roughly the same thing sitting hundreds of lines appart.

Another solution could be having a queue and a coroutine awaiting on that queue, but that marginally doesn't solve the issue that we're going to do slightly different things on the notification vs the poll.

Trying to put a brain cell on that. It's not blocking for merging this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored it to consolidate the logic.

side_tasks = [asyncio.create_task(self.periodic_deferrer(), name="deferrer")]
if self.wait and self.listen_notify:
side_tasks = [
asyncio.create_task(self.periodic_deferrer(), name="deferrer"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should periodic_deferrer start with a _ too ? Is there a rationale on what gets a _ vs on on a class that's itself internal ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about Python. Coming from a different background, I see _ being used when it is not meant to be used outside the class, whether the class is internal or not.
Changing to _periodic_deferrer but happy to go with whatever convention you prefer here.

if self.wait and self.listen_notify:
side_tasks = [
asyncio.create_task(self.periodic_deferrer(), name="deferrer"),
asyncio.create_task(self._poll_jobs_to_abort(), name="poll_jobs_to_abort"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the 2 polling operations are independent, should the intervals be controlled by the same variable ? (can be handled in a different PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it too. We can expose 2 different settings. Happy for it to be a separate PR.

tests/acceptance/test_async.py Outdated Show resolved Hide resolved
tests/acceptance/test_async.py Show resolved Hide resolved
Copy link
Member

@ewjoachim ewjoachim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're almost there. This should_abort_async question and unless I forgot something, we're good !

Copy link
Member

@ewjoachim ewjoachim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last suggestion on your last commit, and I'm good.

procrastinate/worker.py Outdated Show resolved Hide resolved
@onlyann onlyann requested a review from ewjoachim September 3, 2024 11:57
@ewjoachim
Copy link
Member

coverage is down but it's not your fault. I'll fix it and you'll need to rebase.

@ewjoachim
Copy link
Member

#1179
PR should be merged quickly. Sorry for the mess (it's all GitHub's fault :D )

@onlyann
Copy link
Contributor Author

onlyann commented Sep 5, 2024

#1179 PR should be merged quickly. Sorry for the mess (it's all GitHub's fault :D )

No worries. I can rebase once it lands in v3 branch

@ewjoachim
Copy link
Member

v3 is up to date :)

@onlyann onlyann force-pushed the cancel-notification branch 5 times, most recently from a5f7619 to 1511f31 Compare September 6, 2024 11:10
@onlyann onlyann force-pushed the cancel-notification branch from 1511f31 to 2fd31bb Compare September 6, 2024 11:10
@onlyann
Copy link
Contributor Author

onlyann commented Sep 6, 2024

Rebased and squashed
All 🟢 🥳

@ewjoachim ewjoachim merged commit 5719510 into procrastinate-org:v3 Sep 6, 2024
11 checks passed
@ewjoachim
Copy link
Member

And Merged !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
PR type: breaking 💥 Contains breaking changes PR type: feature ⭐️ Contains new features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants