Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dataset downloading #7864

Merged
merged 43 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
d7bb1c1
Make dataset cache lifetime configurable
zhiltsov-max May 2, 2024
7f151b1
Use flock to lock export cache
zhiltsov-max May 3, 2024
f48e807
- use redlock for export cache locking
zhiltsov-max May 7, 2024
4c58bf5
update server deps
zhiltsov-max May 7, 2024
d7fa31d
Merge branch 'develop' into zm/fix-dataset-downloading
zhiltsov-max May 8, 2024
5c22d28
Fix deps after merge
zhiltsov-max May 8, 2024
49baadf
Refactor some code
zhiltsov-max May 8, 2024
de73fe6
fix import
zhiltsov-max May 8, 2024
35ce1f6
Fix linter
zhiltsov-max May 8, 2024
7b671f5
Update changelog
zhiltsov-max May 8, 2024
4a4b5d0
Fix failing cloud storage tests
zhiltsov-max May 9, 2024
05f6dd1
Merge branch 'develop' into zm/fix-dataset-downloading
zhiltsov-max May 9, 2024
2c4be73
Add rough test
zhiltsov-max May 14, 2024
5dc3e56
Refactor code
zhiltsov-max May 14, 2024
4c24e10
Refactor test
zhiltsov-max May 14, 2024
98ad20e
Merge remote-tracking branch 'origin/zm/fix-dataset-downloading' into…
zhiltsov-max May 14, 2024
963f657
Refactor some code
zhiltsov-max May 14, 2024
2bf19db
Update deps
zhiltsov-max May 14, 2024
c0769f0
Add redlock docs link
zhiltsov-max May 14, 2024
087ac42
Add more tests
zhiltsov-max May 14, 2024
0d75d4a
Add more tests
zhiltsov-max May 14, 2024
ea71c7d
Add more tests, update export test
zhiltsov-max May 14, 2024
7085076
Update deps
zhiltsov-max May 16, 2024
f8d142f
Add rq cleanup to tests with multiple iterations
zhiltsov-max May 16, 2024
9479888
Update comment
zhiltsov-max May 16, 2024
da06903
Remove unused imports
zhiltsov-max May 16, 2024
700bf2c
Refactor lock release
zhiltsov-max May 16, 2024
9ccafc5
Rename lock function
zhiltsov-max May 16, 2024
c2b3049
Add api compatibility test
zhiltsov-max May 16, 2024
228e82e
Merge branch 'develop' into zm/fix-dataset-downloading
zhiltsov-max May 16, 2024
4ddc79d
Fix tests
zhiltsov-max May 16, 2024
447b4df
Merge remote-tracking branch 'origin/zm/fix-dataset-downloading' into…
zhiltsov-max May 16, 2024
2a67f2e
Merge branch 'develop' into zm/fix-dataset-downloading
zhiltsov-max May 17, 2024
4471aa9
Restore old job removal behavior
zhiltsov-max May 17, 2024
de2c805
Merge remote-tracking branch 'origin/zm/fix-dataset-downloading' into…
zhiltsov-max May 17, 2024
e76382d
Update downloading test
zhiltsov-max May 17, 2024
2af76a8
Fix comments
zhiltsov-max May 27, 2024
aafc369
Remove unused import
zhiltsov-max May 27, 2024
275a274
Raise an exception on unlocking expired lock
zhiltsov-max May 28, 2024
297d686
Remove unexpected requirements changes
zhiltsov-max May 28, 2024
8d5401c
Merge branch 'develop' into zm/fix-dataset-downloading
zhiltsov-max May 28, 2024
664ed28
Remove extra requirements
zhiltsov-max May 28, 2024
6fc3f63
Merge remote-tracking branch 'origin/zm/fix-dataset-downloading' into…
zhiltsov-max May 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Fixed

- The 500 / "The result file does not exist in export cache" error
on dataset export request
(<https://github.com/cvat-ai/cvat/pull/7864>)
18 changes: 18 additions & 0 deletions cvat/apps/dataset_manager/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright (C) 2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

from django.apps import AppConfig


class DatasetManagerConfig(AppConfig):
name = "cvat.apps.dataset_manager"

def ready(self) -> None:
from django.conf import settings

from . import default_settings

for key in dir(default_settings):
if key.isupper() and not hasattr(settings, key):
setattr(settings, key, getattr(default_settings, key))
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
11 changes: 11 additions & 0 deletions cvat/apps/dataset_manager/default_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (C) 2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import os

DATASET_CACHE_TTL = int(os.getenv("CVAT_DATASET_CACHE_TTL", 60 * 60 * 10))
"Base lifetime for cached exported datasets, in seconds"

DATASET_CACHE_LOCK_TIMEOUT = int(os.getenv("CVAT_DATASET_CACHE_LOCK_TIMEOUT", 10))
"Timeout for cache lock acquiring, in seconds"
65 changes: 62 additions & 3 deletions cvat/apps/dataset_manager/util.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@

# Copyright (C) 2019-2022 Intel Corporation
# Copyright (C) 2023-2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

from contextlib import contextmanager, suppress
from copy import deepcopy
from typing import Sequence
from datetime import timedelta
from threading import Lock
from typing import Any, Generator, Optional, Sequence
import inspect
import os, os.path as osp
import os
import os.path as osp
import zipfile

import django_rq
from django.conf import settings
from django.db import models
from pottery import Redlock, ReleaseUnlockedLock


def current_function_name(depth=1):
Expand Down Expand Up @@ -80,3 +86,56 @@ def deepcopy_simple(v):
return v
else:
return deepcopy(v)



class LockError(Exception):
pass


class LockTimeoutError(LockError):
pass


class LockNotAvailableError(LockError):
pass


@contextmanager
def get_dataset_cache_lock(
export_path: os.PathLike[str],
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
*,
ttl: int | timedelta,
block: bool = True,
acquire_timeout: Optional[int | timedelta] = None,
) -> Generator[Lock, Any, Any]:
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(acquire_timeout, timedelta):
acquire_timeout = acquire_timeout.total_seconds()
elif acquire_timeout and acquire_timeout < 0:
raise ValueError("acquire_timeout must be a positive number")
elif acquire_timeout is None:
acquire_timeout = -1

if isinstance(ttl, timedelta):
ttl = ttl.total_seconds()
elif not ttl or ttl < 0:
raise ValueError("ttl must be a positive number")

SpecLad marked this conversation as resolved.
Show resolved Hide resolved
lock = Redlock(
key=export_path,
masters={django_rq.get_connection(settings.CVAT_QUEUES.EXPORT_DATA.value)},
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
auto_release_time=ttl,
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
)
try:
acquired = lock.acquire(blocking=block, timeout=acquire_timeout)
if acquired:
yield lock
else:
if 0 < acquire_timeout:
raise LockTimeoutError
else:
raise LockNotAvailableError

SpecLad marked this conversation as resolved.
Show resolved Hide resolved
finally:
with suppress(ReleaseUnlockedLock):
lock.release()
Copy link
Contributor

@SpecLad SpecLad May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could at least log a warning if the lock was unlocked? It seems like it could cause hard-to-debug issues, so some diagnostics would be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there's still no diagnostic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it happens, it's being logged in both worker callbacks as a regular exception. For requests, it can be logged there once there is such logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand, how can it be logged when get_export_cache_lock doesn't report this case in any way? Maybe we're talking about different things? I'm talking about the scenario where the code within the scope of the context manager takes too long and the lock unlocks by itself.

Copy link
Contributor Author

@zhiltsov-max zhiltsov-max May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, restored the exception. However, I only expect these exceptions to be obtainable in downloading endpoints, as the rq workers should be killed before the lock is expired, if they work for too long.

188 changes: 139 additions & 49 deletions cvat/apps/dataset_manager/views.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# Copyright (C) 2019-2022 Intel Corporation
# Copyright (C) 2023 CVAT.ai Corporation
# Copyright (C) 2023-2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

from contextlib import suppress
import logging
import os
import os.path as osp
import re
import tempfile
from datetime import timedelta

import django_rq
import rq
from datumaro.util.os_util import make_file_name
from datumaro.util import to_snake_case
from django.utils import timezone
Expand All @@ -20,7 +24,7 @@
from cvat.apps.engine.models import Project, Task, Job

from .formats.registry import EXPORT_FORMATS, IMPORT_FORMATS
from .util import current_function_name
from .util import current_function_name, get_dataset_cache_lock, LockError

slogger = ServerLogManager(__name__)

Expand All @@ -32,78 +36,123 @@ def log_exception(logger=None, exc_info=True):
(_MODULE_NAME, current_function_name(2)),
exc_info=exc_info)

EXPORT_CACHE_DIR_NAME = 'export_cache'

def get_export_cache_dir(db_instance):
base_dir = osp.abspath(db_instance.get_dirname())

if osp.isdir(base_dir):
return osp.join(base_dir, 'export_cache')
return osp.join(base_dir, EXPORT_CACHE_DIR_NAME)
else:
raise FileNotFoundError('{} dir {} does not exist'.format(db_instance.__class__.__name__, base_dir))
raise FileNotFoundError(
'{} dir {} does not exist'.format(db_instance.__class__.__name__, base_dir)
)

DEFAULT_CACHE_TTL = timedelta(hours=10)
TASK_CACHE_TTL = DEFAULT_CACHE_TTL
DEFAULT_CACHE_TTL = timedelta(seconds=settings.DATASET_CACHE_TTL)
PROJECT_CACHE_TTL = DEFAULT_CACHE_TTL / 3
TASK_CACHE_TTL = DEFAULT_CACHE_TTL
JOB_CACHE_TTL = DEFAULT_CACHE_TTL

EXPORT_CACHE_LOCK_TIMEOUT = timedelta(seconds=settings.DATASET_CACHE_LOCK_TIMEOUT)

def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> int:
TTL_CONSTS = {
'project': PROJECT_CACHE_TTL,
'task': TASK_CACHE_TTL,
'job': JOB_CACHE_TTL,
}
SpecLad marked this conversation as resolved.
Show resolved Hide resolved

if not isinstance(db_instance, str):
db_instance = db_instance.__class__.__name__.lower()

return TTL_CONSTS[db_instance].total_seconds()
SpecLad marked this conversation as resolved.
Show resolved Hide resolved

def get_file_instance_timestamp(file_path: str) -> float:
match = re.search(r'instance(\d+\.\d+)', osp.basename(file_path))
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
return float(match.group(1))


def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False):
try:
if task_id is not None:
logger = slogger.task[task_id]
cache_ttl = TASK_CACHE_TTL
export_fn = task.export_task
db_instance = Task.objects.get(pk=task_id)
elif project_id is not None:
logger = slogger.project[project_id]
cache_ttl = PROJECT_CACHE_TTL
export_fn = project.export_project
db_instance = Project.objects.get(pk=project_id)
else:
logger = slogger.job[job_id]
cache_ttl = JOB_CACHE_TTL
export_fn = task.export_job
db_instance = Job.objects.get(pk=job_id)

cache_dir = get_export_cache_dir(db_instance)
cache_ttl = timedelta(seconds=get_export_cache_ttl(db_instance))

cache_dir = get_export_cache_dir(db_instance)
exporter = EXPORT_FORMATS[dst_format]
output_base = '%s_%s' % ('dataset' if save_images else 'annotations',
make_file_name(to_snake_case(dst_format)))
output_path = '%s.%s' % (output_base, exporter.EXT)
output_path = osp.join(cache_dir, output_path)

instance_time = timezone.localtime(db_instance.updated_date).timestamp()
instance_update_time = timezone.localtime(db_instance.updated_date)
if isinstance(db_instance, Project):
tasks_update = list(map(lambda db_task: timezone.localtime(
db_task.updated_date).timestamp(), db_instance.tasks.all()))
instance_time = max(tasks_update + [instance_time])
if not (osp.exists(output_path) and \
instance_time <= osp.getmtime(output_path)):
os.makedirs(cache_dir, exist_ok=True)
with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir:
temp_file = osp.join(temp_dir, 'result')
export_fn(db_instance.id, temp_file, dst_format,
server_url=server_url, save_images=save_images)
os.replace(temp_file, output_path)

archive_ctime = osp.getctime(output_path)
scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value)
cleaning_job = scheduler.enqueue_in(time_delta=cache_ttl,
func=clear_export_cache,
file_path=output_path,
file_ctime=archive_ctime,
logger=logger)
logger.info(
"The {} '{}' is exported as '{}' at '{}' "
"and available for downloading for the next {}. "
"Export cache cleaning job is enqueued, id '{}'".format(
db_instance.__class__.__name__.lower(),
db_instance.name if isinstance(db_instance, (Project, Task)) else db_instance.id,
dst_format, output_path, cache_ttl,
cleaning_job.id
))
tasks_update = list(map(
lambda db_task: timezone.localtime(db_task.updated_date),
db_instance.tasks.all()
))
instance_update_time = max(tasks_update + [instance_update_time])

output_path = '%s-instance%s_%s.%s' % (
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
'dataset' if save_images else 'annotations',
# store the instance timestamp in the file name to reliably get this information
# ctime / mtime do not return file creation time on linux
# mtime is used for file usage checks
instance_update_time.timestamp(),
make_file_name(to_snake_case(dst_format)),
exporter.EXT,
)
output_path = osp.join(cache_dir, output_path)

os.makedirs(cache_dir, exist_ok=True)

with get_dataset_cache_lock(
output_path,
block=True,
acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT,
ttl=rq.get_current_job().timeout,
):
if not (
osp.exists(output_path)
and instance_update_time.timestamp() <= get_file_instance_timestamp(output_path)
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
):
with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir:
temp_file = osp.join(temp_dir, 'result')
export_fn(db_instance.id, temp_file, dst_format,
server_url=server_url, save_images=save_images)
os.replace(temp_file, output_path)

scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value)
cleaning_job = scheduler.enqueue_in(
time_delta=cache_ttl,
func=clear_export_cache,
file_path=output_path,
file_ctime=instance_update_time.timestamp(),
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
logger=logger
)
logger.info(
"The {} '{}' is exported as '{}' at '{}' "
"and available for downloading for the next {}. "
"Export cache cleaning job is enqueued, id '{}'".format(
db_instance.__class__.__name__.lower(),
db_instance.name if isinstance(db_instance, (Project, Task)) else db_instance.id,
dst_format, output_path, cache_ttl,
cleaning_job.id
)
)
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved

return output_path
except LockError:
rq_job = rq.get_current_job()
rq_job.retry(django_rq.get_queue(settings.CVAT_QUEUES.EXPORT_DATA.value))
return
except Exception:
log_exception(logger)
raise
Expand All @@ -127,14 +176,55 @@ def export_project_annotations(project_id, dst_format=None, server_url=None):
return export(dst_format, project_id=project_id, server_url=server_url, save_images=False)


def clear_export_cache(file_path, file_ctime, logger):
def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger) -> None:
try:
if osp.exists(file_path) and osp.getctime(file_path) == file_ctime:
with get_dataset_cache_lock(
file_path,
block=True,
acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT,
ttl=rq.get_current_job().timeout,
):
if 'job' in file_path:
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
instance_type = 'job'
elif 'task' in file_path:
instance_type = 'task'
elif 'project' in file_path:
instance_type = 'project'
else:
assert False

cache_ttl = get_export_cache_ttl(instance_type)

instance_timestamp = None
with suppress(AttributeError): # backward compatibility
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
instance_timestamp = get_file_instance_timestamp(file_path)
if instance_timestamp and instance_timestamp != file_ctime:
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Export cache file '{}' has changed, skipping".format(file_path))
return

if timezone.now().timestamp() <= osp.getmtime(file_path) + cache_ttl:
# Need to retry later, the export is in use
rq_job = rq.get_current_job()
rq_job.retries_left = 1
rq_job.retry_intervals = [cache_ttl]
rq_job.retry(
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
django_rq.get_queue(settings.CVAT_QUEUES.EXPORT_DATA.value), pipeline=None
)
logger.info(
"Export cache file '{}' is recently accessed, will retry in {}".format(
file_path, timedelta(seconds=cache_ttl)
)
)
return

# TODO: maybe remove all outdated exports
os.remove(file_path)

logger.info(
"Export cache file '{}' successfully removed" \
.format(file_path))
logger.info("Export cache file '{}' successfully removed".format(file_path))
except LockError:
# Need to retry later if the lock was not available
rq_job = rq.get_current_job()
rq_job.retry(django_rq.get_queue(settings.CVAT_QUEUES.EXPORT_DATA.value), pipeline=None)
return
except Exception:
log_exception(logger)
raise
Expand Down
Loading
Loading