Skip to content

Commit

Permalink
Fix more potential race conditions when fetching and enqueuing RQ jobs (
Browse files Browse the repository at this point in the history
#9120)

This PR is a continuation of #9102.
  • Loading branch information
Marishka17 authored Feb 20, 2025
1 parent 558a861 commit fd09656
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 76 deletions.
113 changes: 54 additions & 59 deletions cvat/apps/lambda_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@
from cvat.apps.engine.rq_job_handler import RQId, RQJobMetaField
from cvat.apps.engine.serializers import LabeledDataSerializer
from cvat.apps.engine.types import ExtendedRequest
from cvat.apps.engine.utils import define_dependent_job, get_rq_job_meta, get_rq_lock_by_user
from cvat.apps.engine.utils import (
define_dependent_job,
get_rq_job_meta,
get_rq_lock_by_user,
get_rq_lock_for_job,
)
from cvat.apps.events.handlers import handle_function_call
from cvat.apps.iam.filters import ORGANIZATION_OPEN_API_PARAMETERS
from cvat.apps.lambda_manager.models import FunctionKind
Expand Down Expand Up @@ -608,65 +613,55 @@ def enqueue(
queue = self._get_queue()
rq_id = RQId(RequestAction.AUTOANNOTATE, RequestTarget.TASK, task).render()

# It is still possible to run several concurrent jobs for the same task.
# But the race isn't critical. The filtration is just a light-weight
# protection.
rq_job = queue.fetch_job(rq_id)

have_conflict = rq_job and rq_job.get_status(refresh=False) not in {
rq.job.JobStatus.FAILED,
rq.job.JobStatus.FINISHED,
}

# There could be some jobs left over from before the current naming convention was adopted.
# TODO: remove this check after a few releases.
have_legacy_conflict = any(
job.get_task() == task and not (job.is_finished or job.is_failed)
for job in self.get_jobs()
)
if have_conflict or have_legacy_conflict:
raise ValidationError(
"Only one running request is allowed for the same task #{}".format(task),
code=status.HTTP_409_CONFLICT,
)

if rq_job:
rq_job.delete()

# LambdaJob(None) is a workaround for python-rq. It has multiple issues
# with invocation of non-trivial functions. For example, it cannot run
# staticmethod, it cannot run a callable class. Thus I provide an object
# which has __call__ function.
user_id = request.user.id

with get_rq_lock_by_user(queue, user_id):
rq_job = queue.create_job(
LambdaJob(None),
job_id=rq_id,
meta={
**get_rq_job_meta(
request,
db_obj=(Job.objects.get(pk=job) if job else Task.objects.get(pk=task)),
),
RQJobMetaField.FUNCTION_ID: lambda_func.id,
"lambda": True,
},
kwargs={
"function": lambda_func,
"threshold": threshold,
"task": task,
"job": job,
"cleanup": cleanup,
"conv_mask_to_poly": conv_mask_to_poly,
"mapping": mapping,
"max_distance": max_distance,
},
depends_on=define_dependent_job(queue, user_id),
result_ttl=self.RESULT_TTL.total_seconds(),
failure_ttl=self.FAILED_TTL.total_seconds(),
)
# Ensure that there is no race condition when processing parallel requests.
# Enqueuing an RQ job with (queue, user) lock but without (queue, rq_id) lock
# may lead to queue jamming for a user due to self-dependencies.
with get_rq_lock_for_job(queue, rq_id):
if rq_job := queue.fetch_job(rq_id):
if rq_job.get_status(refresh=False) not in {
rq.job.JobStatus.FAILED,
rq.job.JobStatus.FINISHED,
}:
raise ValidationError(
"Only one running request is allowed for the same task #{}".format(task),
code=status.HTTP_409_CONFLICT,
)
rq_job.delete()

# LambdaJob(None) is a workaround for python-rq. It has multiple issues
# with invocation of non-trivial functions. For example, it cannot run
# staticmethod, it cannot run a callable class. Thus I provide an object
# which has __call__ function.
user_id = request.user.id

with get_rq_lock_by_user(queue, user_id):
rq_job = queue.create_job(
LambdaJob(None),
job_id=rq_id,
meta={
**get_rq_job_meta(
request,
db_obj=(Job.objects.get(pk=job) if job else Task.objects.get(pk=task)),
),
RQJobMetaField.FUNCTION_ID: lambda_func.id,
"lambda": True,
},
kwargs={
"function": lambda_func,
"threshold": threshold,
"task": task,
"job": job,
"cleanup": cleanup,
"conv_mask_to_poly": conv_mask_to_poly,
"mapping": mapping,
"max_distance": max_distance,
},
depends_on=define_dependent_job(queue, user_id),
result_ttl=self.RESULT_TTL.total_seconds(),
failure_ttl=self.FAILED_TTL.total_seconds(),
)

queue.enqueue_job(rq_job)
queue.enqueue_job(rq_job)

return LambdaJob(rq_job)

Expand Down
40 changes: 23 additions & 17 deletions cvat/apps/quality_control/quality_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@
ValidationMode,
)
from cvat.apps.engine.types import ExtendedRequest
from cvat.apps.engine.utils import define_dependent_job, get_rq_job_meta, get_rq_lock_by_user
from cvat.apps.engine.utils import (
define_dependent_job,
get_rq_job_meta,
get_rq_lock_by_user,
get_rq_lock_for_job,
)
from cvat.apps.profiler import silk_profile
from cvat.apps.quality_control import models
from cvat.apps.quality_control.models import (
Expand Down Expand Up @@ -2274,11 +2279,11 @@ def schedule_custom_quality_check_job(
self._check_quality_reporting_available(task)

queue = self._get_queue()
rq_id = self._make_custom_quality_check_job_id(task_id=task.id, user_id=user_id)

with get_rq_lock_by_user(queue, user_id=user_id):
rq_id = self._make_custom_quality_check_job_id(task_id=task.id, user_id=user_id)
rq_job = queue.fetch_job(rq_id)
if rq_job:
# ensure that there is no race condition when processing parallel requests
with get_rq_lock_for_job(queue, rq_id):
if rq_job := queue.fetch_job(rq_id):
if rq_job.get_status(refresh=False) in (
rq.job.JobStatus.QUEUED,
rq.job.JobStatus.STARTED,
Expand All @@ -2289,19 +2294,20 @@ def schedule_custom_quality_check_job(

rq_job.delete()

dependency = define_dependent_job(
queue, user_id=user_id, rq_id=rq_id, should_be_dependent=True
)
with get_rq_lock_by_user(queue, user_id=user_id):
dependency = define_dependent_job(
queue, user_id=user_id, rq_id=rq_id, should_be_dependent=True
)

queue.enqueue(
self._check_task_quality,
task_id=task.id,
job_id=rq_id,
meta=get_rq_job_meta(request=request, db_obj=task),
result_ttl=self._JOB_RESULT_TTL,
failure_ttl=self._JOB_RESULT_TTL,
depends_on=dependency,
)
queue.enqueue(
self._check_task_quality,
task_id=task.id,
job_id=rq_id,
meta=get_rq_job_meta(request=request, db_obj=task),
result_ttl=self._JOB_RESULT_TTL,
failure_ttl=self._JOB_RESULT_TTL,
depends_on=dependency,
)

return rq_id

Expand Down

0 comments on commit fd09656

Please sign in to comment.