diff --git a/cvat/apps/lambda_manager/views.py b/cvat/apps/lambda_manager/views.py index 15b96a2e796..c682dc76af8 100644 --- a/cvat/apps/lambda_manager/views.py +++ b/cvat/apps/lambda_manager/views.py @@ -48,7 +48,12 @@ from cvat.apps.engine.rq_job_handler import RQId, RQJobMetaField from cvat.apps.engine.serializers import LabeledDataSerializer from cvat.apps.engine.types import ExtendedRequest -from cvat.apps.engine.utils import define_dependent_job, get_rq_job_meta, get_rq_lock_by_user +from cvat.apps.engine.utils import ( + define_dependent_job, + get_rq_job_meta, + get_rq_lock_by_user, + get_rq_lock_for_job, +) from cvat.apps.events.handlers import handle_function_call from cvat.apps.iam.filters import ORGANIZATION_OPEN_API_PARAMETERS from cvat.apps.lambda_manager.models import FunctionKind @@ -608,65 +613,55 @@ def enqueue( queue = self._get_queue() rq_id = RQId(RequestAction.AUTOANNOTATE, RequestTarget.TASK, task).render() - # It is still possible to run several concurrent jobs for the same task. - # But the race isn't critical. The filtration is just a light-weight - # protection. - rq_job = queue.fetch_job(rq_id) - - have_conflict = rq_job and rq_job.get_status(refresh=False) not in { - rq.job.JobStatus.FAILED, - rq.job.JobStatus.FINISHED, - } - - # There could be some jobs left over from before the current naming convention was adopted. - # TODO: remove this check after a few releases. - have_legacy_conflict = any( - job.get_task() == task and not (job.is_finished or job.is_failed) - for job in self.get_jobs() - ) - if have_conflict or have_legacy_conflict: - raise ValidationError( - "Only one running request is allowed for the same task #{}".format(task), - code=status.HTTP_409_CONFLICT, - ) - - if rq_job: - rq_job.delete() - - # LambdaJob(None) is a workaround for python-rq. It has multiple issues - # with invocation of non-trivial functions. For example, it cannot run - # staticmethod, it cannot run a callable class. Thus I provide an object - # which has __call__ function. - user_id = request.user.id - - with get_rq_lock_by_user(queue, user_id): - rq_job = queue.create_job( - LambdaJob(None), - job_id=rq_id, - meta={ - **get_rq_job_meta( - request, - db_obj=(Job.objects.get(pk=job) if job else Task.objects.get(pk=task)), - ), - RQJobMetaField.FUNCTION_ID: lambda_func.id, - "lambda": True, - }, - kwargs={ - "function": lambda_func, - "threshold": threshold, - "task": task, - "job": job, - "cleanup": cleanup, - "conv_mask_to_poly": conv_mask_to_poly, - "mapping": mapping, - "max_distance": max_distance, - }, - depends_on=define_dependent_job(queue, user_id), - result_ttl=self.RESULT_TTL.total_seconds(), - failure_ttl=self.FAILED_TTL.total_seconds(), - ) + # Ensure that there is no race condition when processing parallel requests. + # Enqueuing an RQ job with (queue, user) lock but without (queue, rq_id) lock + # may lead to queue jamming for a user due to self-dependencies. + with get_rq_lock_for_job(queue, rq_id): + if rq_job := queue.fetch_job(rq_id): + if rq_job.get_status(refresh=False) not in { + rq.job.JobStatus.FAILED, + rq.job.JobStatus.FINISHED, + }: + raise ValidationError( + "Only one running request is allowed for the same task #{}".format(task), + code=status.HTTP_409_CONFLICT, + ) + rq_job.delete() + + # LambdaJob(None) is a workaround for python-rq. It has multiple issues + # with invocation of non-trivial functions. For example, it cannot run + # staticmethod, it cannot run a callable class. Thus I provide an object + # which has __call__ function. + user_id = request.user.id + + with get_rq_lock_by_user(queue, user_id): + rq_job = queue.create_job( + LambdaJob(None), + job_id=rq_id, + meta={ + **get_rq_job_meta( + request, + db_obj=(Job.objects.get(pk=job) if job else Task.objects.get(pk=task)), + ), + RQJobMetaField.FUNCTION_ID: lambda_func.id, + "lambda": True, + }, + kwargs={ + "function": lambda_func, + "threshold": threshold, + "task": task, + "job": job, + "cleanup": cleanup, + "conv_mask_to_poly": conv_mask_to_poly, + "mapping": mapping, + "max_distance": max_distance, + }, + depends_on=define_dependent_job(queue, user_id), + result_ttl=self.RESULT_TTL.total_seconds(), + failure_ttl=self.FAILED_TTL.total_seconds(), + ) - queue.enqueue_job(rq_job) + queue.enqueue_job(rq_job) return LambdaJob(rq_job) diff --git a/cvat/apps/quality_control/quality_reports.py b/cvat/apps/quality_control/quality_reports.py index 87aef7e82b3..8be628a5bed 100644 --- a/cvat/apps/quality_control/quality_reports.py +++ b/cvat/apps/quality_control/quality_reports.py @@ -53,7 +53,12 @@ ValidationMode, ) from cvat.apps.engine.types import ExtendedRequest -from cvat.apps.engine.utils import define_dependent_job, get_rq_job_meta, get_rq_lock_by_user +from cvat.apps.engine.utils import ( + define_dependent_job, + get_rq_job_meta, + get_rq_lock_by_user, + get_rq_lock_for_job, +) from cvat.apps.profiler import silk_profile from cvat.apps.quality_control import models from cvat.apps.quality_control.models import ( @@ -2274,11 +2279,11 @@ def schedule_custom_quality_check_job( self._check_quality_reporting_available(task) queue = self._get_queue() + rq_id = self._make_custom_quality_check_job_id(task_id=task.id, user_id=user_id) - with get_rq_lock_by_user(queue, user_id=user_id): - rq_id = self._make_custom_quality_check_job_id(task_id=task.id, user_id=user_id) - rq_job = queue.fetch_job(rq_id) - if rq_job: + # ensure that there is no race condition when processing parallel requests + with get_rq_lock_for_job(queue, rq_id): + if rq_job := queue.fetch_job(rq_id): if rq_job.get_status(refresh=False) in ( rq.job.JobStatus.QUEUED, rq.job.JobStatus.STARTED, @@ -2289,19 +2294,20 @@ def schedule_custom_quality_check_job( rq_job.delete() - dependency = define_dependent_job( - queue, user_id=user_id, rq_id=rq_id, should_be_dependent=True - ) + with get_rq_lock_by_user(queue, user_id=user_id): + dependency = define_dependent_job( + queue, user_id=user_id, rq_id=rq_id, should_be_dependent=True + ) - queue.enqueue( - self._check_task_quality, - task_id=task.id, - job_id=rq_id, - meta=get_rq_job_meta(request=request, db_obj=task), - result_ttl=self._JOB_RESULT_TTL, - failure_ttl=self._JOB_RESULT_TTL, - depends_on=dependency, - ) + queue.enqueue( + self._check_task_quality, + task_id=task.id, + job_id=rq_id, + meta=get_rq_job_meta(request=request, db_obj=task), + result_ttl=self._JOB_RESULT_TTL, + failure_ttl=self._JOB_RESULT_TTL, + depends_on=dependency, + ) return rq_id