Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix infinite lock for chunk preparing #8769

Merged
merged 4 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Fixed

- Possible endless lock acquisition for chunk preparation job
(<https://github.com/cvat-ai/cvat/pull/8769>)
42 changes: 23 additions & 19 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,11 @@ def enqueue_create_chunk_job(
rq_job_id: str,
create_callback: Callback,
*,
blocking_timeout: int = 50,
rq_job_result_ttl: int = 60,
rq_job_failure_ttl: int = 3600 * 24 * 14, # 2 weeks
) -> rq.job.Job:
try:
with get_rq_lock_for_job(queue, rq_job_id, blocking_timeout=blocking_timeout):
with get_rq_lock_for_job(queue, rq_job_id):
rq_job = queue.fetch_job(rq_job_id)

if not rq_job:
Expand Down Expand Up @@ -199,11 +198,13 @@ def _get_or_set_cache_item(
cache_item_ttl=cache_item_ttl,
)

def _get_queue(self) -> rq.Queue:
return django_rq.get_queue(self._QUEUE_NAME)
@classmethod
def _get_queue(cls) -> rq.Queue:
return django_rq.get_queue(cls._QUEUE_NAME)

def _make_queue_job_id(self, key: str) -> str:
return f"{self._QUEUE_JOB_PREFIX_TASK}{key}"
@classmethod
def _make_queue_job_id(cls, key: str) -> str:
return f"{cls._QUEUE_JOB_PREFIX_TASK}{key}"

@staticmethod
def _drop_return_value(func: Callable[..., DataWithMime], *args: Any, **kwargs: Any):
Expand All @@ -222,7 +223,15 @@ def _create_and_set_cache_item(
item = (item_data[0], item_data[1], cls._get_checksum(item_data_bytes), timestamp)
if item_data_bytes:
cache = cls._cache()
cache.set(key, item, timeout=cache_item_ttl or cache.default_timeout)
with get_rq_lock_for_job(
cls._get_queue(),
key,
):
cached_item = cache.get(key)
if cached_item is not None and timestamp <= cached_item[3]:
item = cached_item
else:
cache.set(key, item, timeout=cache_item_ttl or cache.default_timeout)

return item

Expand All @@ -233,22 +242,17 @@ def _create_cache_item(
*,
cache_item_ttl: Optional[int] = None,
) -> _CacheItem:

queue = self._get_queue()
rq_id = self._make_queue_job_id(key)

slogger.glob.info(f"Starting to prepare chunk: key {key}")
if _is_run_inside_rq():
with get_rq_lock_for_job(queue, rq_id, timeout=None, blocking_timeout=None):
item = self._create_and_set_cache_item(
key,
create_callback,
cache_item_ttl=cache_item_ttl,
)
item = self._create_and_set_cache_item(
key,
create_callback,
cache_item_ttl=cache_item_ttl,
)
else:
rq_job = enqueue_create_chunk_job(
queue=queue,
rq_job_id=rq_id,
queue=self._get_queue(),
rq_job_id=self._make_queue_job_id(key),
create_callback=Callback(
callable=self._drop_return_value,
args=[
Expand Down
5 changes: 4 additions & 1 deletion cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,11 @@ def get_rq_lock_by_user(queue: DjangoRQ, user_id: int, *, timeout: Optional[int]
)
return nullcontext()

def get_rq_lock_for_job(queue: DjangoRQ, rq_id: str, *, timeout: Optional[int] = 60, blocking_timeout: Optional[int] = None) -> Lock:
def get_rq_lock_for_job(queue: DjangoRQ, rq_id: str, *, timeout: int = 60, blocking_timeout: int = 50) -> Lock:
# lock timeout corresponds to the nginx request timeout (proxy_read_timeout)

assert timeout is not None
assert blocking_timeout is not None
return queue.connection.lock(
name=f'lock-for-job-{rq_id}'.lower(),
timeout=timeout,
Expand Down
Loading