Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed acquire_jobs method in MongoDB DataStore. #948

Closed
wants to merge 3 commits into from

Conversation

HK-Mattew
Copy link
Contributor

Changes

Fixes #.

Checklist

If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):

  • You've added tests (in tests/) added which would fail without your patch
  • You've updated the documentation (in docs/, in case of behavior changes or new
    features)
  • You've added a new changelog entry (in docs/versionhistory.rst).

If this is a trivial change, like a typo fix or a code reformatting, then you can ignore
these instructions.

Updating the changelog

If there are no entries after the last release, use **UNRELEASED** as the version.
If, say, your patch fixes issue #999, the entry should look like this:

* Fix big bad boo-boo in the async scheduler (#999 <https://github.com/agronholm/apscheduler/issues/999>_; PR by @yourgithubaccount)

If there's no issue linked, just link to your pull request instead by updating the
changelog after you've created the PR.

Copy link
Owner

@agronholm agronholm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, and you still need to add a test that would fail without the fix, and a changelog note.

Comment on lines 689 to 693
task_slots_left = task_job_slots_left.get(job.task_id, float("inf"))
if (
not task_slots_left
or running_job_count_increments[job.task_id] == task_slots_left
):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be the correct fix?

Suggested change
task_slots_left = task_job_slots_left.get(job.task_id, float("inf"))
if (
not task_slots_left
or running_job_count_increments[job.task_id] == task_slots_left
):
if task_job_slots_left.get(job.task_id, float("inf")):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello,

I did this because more jobs were being executed than they should be. It was running without limits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me analyze your change suggestion better. I haven't looked at it yet.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with my current approach with MongoDB is the lack of atomicity. Other schedulers might change the task's running jobs count while this scheduler is running acquire_jobs(), as there is no locking involved. I think a better way to do this is to run a conditional update that only updates the running job count if it's lower than the maximum job count. My initial attempt at this failed because find_and_update(), for whatever bizarre reason, doesn't support referencing existing fields in the filter. But the next better way would be to use the previously fetched maximum running job counts, as those rarely change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, your change to

if task_job_slots_left.get(job.task_id, float("inf")):

Would cause all tasks with a positive value, such as: 1, 2, 3 etc..., to be skipped.

Am I right or not?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, your change to

if task_job_slots_left.get(job.task_id, float("inf")):

Would cause all tasks with a positive value, such as: 1, 2, 3 etc..., to be skipped.

Am I right or not?

You're right – I was admittely tired when writing that reply. But I think that part of my original logic was correct (if not task_job_slots_left.get(job.task_id, float("inf")):). What was wrong was my calculation of free slots (task_job_slots_left[doc["_id"]] = doc["max_running_jobs"]), as I should've deducted the number of slots in use from the max slots.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Your initial/original version was correct.

.gitignore Show resolved Hide resolved
@agronholm
Copy link
Owner

Actually, I think I know a better way to do this with MongoDB.

@HK-Mattew
Copy link
Contributor Author

Actually, I think I know a better way to do this with MongoDB.

Using aggregation with filters?

@HK-Mattew
Copy link
Contributor Author

@agronholm

You mentioned the problem of not having atomicity when incrementing running_jobs in the task. So I tried some possibilities to increment running_jobs atomically in mongodb. And I got a result that seems good to me.

See my gist, if you are interested: https://gist.github.com/HK-Mattew/098f714191b348ba6f424ea7add83a1a

This way it is possible to do the update operation atomically.

@agronholm
Copy link
Owner

@agronholm

You mentioned the problem of not having atomicity when incrementing running_jobs in the task. So I tried some possibilities to increment running_jobs atomically in mongodb. And I got a result that seems good to me.

See my gist, if you are interested: https://gist.github.com/HK-Mattew/098f714191b348ba6f424ea7add83a1a

This way it is possible to do the update operation atomically.

I tried something like this, only to find out that you can't use $expr in an update filter. Did you actually try that query?

Another criticism is that if only one slot is available and the initial query picks two jobs for that task, both would be dropped here.

@HK-Mattew
Copy link
Contributor Author

I tried something like this, only to find out that you can't use $expr in an update filter. Did you actually try that query?

I haven't tested the code itself. But I tested the mongodb query and it worked perfectly.

@HK-Mattew
Copy link
Contributor Author

Another criticism is that if only one slot is available and the initial query picks two jobs for that task, both would be dropped here.

Indeed. However, I see that it is the best option at the moment. At least to avoid increasing running_jobs incorrectly and adding more jobs.

@agronholm
Copy link
Owner

Another criticism is that if only one slot is available and the initial query picks two jobs for that task, both would be dropped here.

Indeed. However, I see that it is the best option at the moment. At least to avoid increasing running_jobs incorrectly and adding more jobs.

What I tried myself was doing the increments one by one, and then if the update count is 0, then we can conclude that the task didn't have any more room, yes?

@HK-Mattew
Copy link
Contributor Author

What I tried myself was doing the increments one by one, and then if the update count is 0, then we can conclude that the task didn't have any more room, yes?

It works too.

It would just be a few more operations in the database, but maybe that wouldn't be a problem. At least not for me, but I don't know what other people's use cases might be.

@agronholm
Copy link
Owner

What I tried myself was doing the increments one by one, and then if the update count is 0, then we can conclude that the task didn't have any more room, yes?

It works too.

It would just be a few more operations in the database, but maybe that wouldn't be a problem. At least not for me, but I don't know what other people's use cases might be.

When you say "it works too", what other approach works? As I pointed out, your approach would work incorrectly if two jobs for the same task are acquired and the task only has one open slot. Ditto with 2 open slots, 5 jobs acquired etc.

@HK-Mattew
Copy link
Contributor Author

What I tried myself was doing the increments one by one, and then if the update count is 0, then we can conclude that the task didn't have any more room, yes?

It works too.
It would just be a few more operations in the database, but maybe that wouldn't be a problem. At least not for me, but I don't know what other people's use cases might be.

When you say "it works too", what other approach works? As I pointed out, your approach would work incorrectly if two jobs for the same task are acquired and the task only has one open slot. Ditto with 2 open slots, 5 jobs acquired etc.

Yes, I understand what you said.

Your implementation will certainly be the best option and will work 100%. My idea would work, but not 100%.

@agronholm
Copy link
Owner

Alright, sounds like we're on the same page now. I'll experiment with your query tomorrow, or you can do it if you like in this PR.

@HK-Mattew
Copy link
Contributor Author

Alright, sounds like we're on the same page now. I'll experiment with your query tomorrow, or you can do it if you like in this PR.

I could do that. However, I don't really know how to work with pull requests very well. This was the first pull request I created 😶

@agronholm
Copy link
Owner

This is what I ended up with (passes all the existing tests):

                    # Try to increment the task's running jobs count
                    update_task_result = await to_thread.run_sync(
                        lambda: self._tasks.update_one(
                            {
                                "_id": job.task_id,
                                "$or": [
                                    {"max_running_jobs": None},
                                    {
                                        "$expr": {
                                            "$gt": [
                                                "$max_running_jobs",
                                                "$running_jobs",
                                            ]
                                        }
                                    },
                                ],
                            },
                            {"$inc": {"running_jobs": 1}},
                            session=session,
                        )
                    )
                    if not update_task_result.matched_count:
                        self._logger.debug(
                            "Skipping job %s because task %r has the maximum number of "
                            "jobs already running",
                            job.id,
                        )
                        skipped_job_ids.append(job.id)
                        continue

@HK-Mattew
Copy link
Contributor Author

This is what I ended up with (passes all the existing tests):

                    # Try to increment the task's running jobs count
                    update_task_result = await to_thread.run_sync(
                        lambda: self._tasks.update_one(
                            {
                                "_id": job.task_id,
                                "$or": [
                                    {"max_running_jobs": None},
                                    {
                                        "$expr": {
                                            "$gt": [
                                                "$max_running_jobs",
                                                "$running_jobs",
                                            ]
                                        }
                                    },
                                ],
                            },
                            {"$inc": {"running_jobs": 1}},
                            session=session,
                        )
                    )
                    if not update_task_result.matched_count:
                        self._logger.debug(
                            "Skipping job %s because task %r has the maximum number of "
                            "jobs already running",
                            job.id,
                        )
                        skipped_job_ids.append(job.id)
                        continue

Well, I would do it the same way.

Just the debug log that is missing some args.

@agronholm
Copy link
Owner

Right, the job.task_id argument was missing. I've added that now. I'll also add a regression test that fails with the unmodified code.

@agronholm agronholm closed this in c9a0017 Aug 2, 2024
@agronholm
Copy link
Owner

Alright, it's fixed now. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants