diff --git a/.vscode/launch.json b/.vscode/launch.json index 0b02a0110cad..884ed02c3b6f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -270,6 +270,28 @@ "env": {}, "console": "internalConsole" }, + { + "name": "server: RQ - cleaning", + "type": "python", + "request": "launch", + "stopOnEntry": false, + "justMyCode": false, + "python": "${command:python.interpreterPath}", + "program": "${workspaceRoot}/manage.py", + "args": [ + "rqworker", + "cleaning", + "--worker-class", + "cvat.rqworker.SimpleWorker" + ], + "django": true, + "cwd": "${workspaceFolder}", + "env": { + "DJANGO_LOG_SERVER_HOST": "localhost", + "DJANGO_LOG_SERVER_PORT": "8282" + }, + "console": "internalConsole" + }, { "name": "server: migrate", "type": "python", @@ -433,6 +455,7 @@ "server: RQ - annotation", "server: RQ - webhooks", "server: RQ - scheduler", + "server: RQ - cleaning", "server: git", ] } diff --git a/CHANGELOG.md b/CHANGELOG.md index 26608ea2d1ae..363130ed983e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support task creation with any type of data supported by the server by default from cloud storage without use_cache option () - Support task creation with cloud storage data and without use_cache option () +- Cleaning worker to check that the uploaded resource has been deleted or delete otherwise () ### Changed - Resource links are opened from any organization/sandbox if available for user () @@ -31,6 +32,12 @@ without use_cache option () ### Fixed - Skeletons dumping on created tasks/projects () - Fix saving annotations for skeleton tracks () +- Wrong location of tmp file when importing job annotations () +- Removing uploaded file with annotations/backups when rq job was created +but no next requests for checking status were not made () +- Removing uploaded file with annotations/backups after file was uploaded to the server by tus protocol +but rq job has not yet been created () +- Tasks/projects creation from backups with the same name at the same time by different users () ### Security - TDB diff --git a/cvat-core/src/server-proxy.ts b/cvat-core/src/server-proxy.ts index 1e7d4d205a23..35d15f2501f5 100644 --- a/cvat-core/src/server-proxy.ts +++ b/cvat-core/src/server-proxy.ts @@ -783,13 +783,14 @@ async function importDataset( }; const url = `${backendAPI}/projects/${id}/dataset`; + let rqId: string; async function wait() { return new Promise((resolve, reject) => { async function requestStatus() { try { const response = await Axios.get(url, { - params: { ...params, action: 'import_status' }, + params: { ...params, action: 'import_status', rq_id: rqId }, }); if (response.status === 202) { if (response.data.message) { @@ -812,10 +813,11 @@ async function importDataset( if (isCloudStorage) { try { - await Axios.post(url, + const response = await Axios.post(url, new FormData(), { params, }); + rqId = response.data.rq_id; } catch (errorData) { throw generateError(errorData); } @@ -837,11 +839,12 @@ async function importDataset( headers: { 'Upload-Start': true }, }); await chunkUpload(file, uploadConfig); - await Axios.post(url, + const response = await Axios.post(url, new FormData(), { params, headers: { 'Upload-Finish': true }, }); + rqId = response.data.rq_id; } catch (errorData) { throw generateError(errorData); } @@ -1617,6 +1620,7 @@ async function uploadAnnotations( filename: typeof file === 'string' ? file : file.name, conv_mask_to_poly: options.convMaskToPoly, }; + let rqId: string; const url = `${backendAPI}/${session}s/${id}/annotations`; async function wait() { @@ -1627,7 +1631,7 @@ async function uploadAnnotations( url, new FormData(), { - params, + params: { ...params, rq_id: rqId }, }, ); if (response.status === 202) { @@ -1646,10 +1650,11 @@ async function uploadAnnotations( if (isCloudStorage) { try { - await Axios.post(url, + const response = await Axios.post(url, new FormData(), { params, }); + rqId = response.data.rq_id; } catch (errorData) { throw generateError(errorData); } @@ -1667,11 +1672,12 @@ async function uploadAnnotations( headers: { 'Upload-Start': true }, }); await chunkUpload(file, uploadConfig); - await Axios.post(url, + const response = await Axios.post(url, new FormData(), { params, headers: { 'Upload-Finish': true }, }); + rqId = response.data.rq_id; } catch (errorData) { throw generateError(errorData); } diff --git a/cvat-sdk/cvat_sdk/core/proxies/tasks.py b/cvat-sdk/cvat_sdk/core/proxies/tasks.py index 74e2b5fbac2c..29e84dc3545d 100644 --- a/cvat-sdk/cvat_sdk/core/proxies/tasks.py +++ b/cvat-sdk/cvat_sdk/core/proxies/tasks.py @@ -450,6 +450,6 @@ def create_from_backup( ) task_id = json.loads(response.data)["id"] - self._client.logger.info(f"Task has been imported sucessfully. Task ID: {task_id}") + self._client.logger.info(f"Task has been imported successfully. Task ID: {task_id}") return self.retrieve(task_id) diff --git a/cvat-sdk/cvat_sdk/core/uploading.py b/cvat-sdk/cvat_sdk/core/uploading.py index ceacda782ce3..e4635f69d88e 100644 --- a/cvat-sdk/cvat_sdk/core/uploading.py +++ b/cvat-sdk/cvat_sdk/core/uploading.py @@ -4,6 +4,7 @@ from __future__ import annotations +import json import os from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple @@ -89,6 +90,7 @@ def create_url(self): headers["upload-length"] = str(self.file_size) headers["upload-metadata"] = ",".join(self.encode_metadata()) resp = self._api_client.rest_client.POST(self.client.url, headers=headers) + self.real_filename = resp.headers.get("Upload-Filename") url = resp.headers.get("location") if url is None: msg = "Attempt to retrieve create file url with status {}".format(resp.status_code) @@ -179,9 +181,10 @@ def upload_file( assert meta["filename"] self._tus_start_upload(url, query_params=query_params) - self._upload_file_data_with_tus( + real_filename = self._upload_file_data_with_tus( url=url, filename=filename, meta=meta, pbar=pbar, logger=logger ) + query_params["filename"] = real_filename return self._tus_finish_upload(url, query_params=query_params, fields=fields) def _wait_for_completion( @@ -216,7 +219,9 @@ def _make_tus_uploader(api_client: ApiClient, url: str, **kwargs): return _MyTusUploader(client=client, api_client=api_client, **kwargs) - def _upload_file_data_with_tus(self, url, filename, *, meta=None, pbar=None, logger=None): + def _upload_file_data_with_tus( + self, url, filename, *, meta=None, pbar=None, logger=None + ) -> str: file_size = filename.stat().st_size if pbar is None: pbar = NullProgressReporter() @@ -233,6 +238,7 @@ def _upload_file_data_with_tus(self, url, filename, *, meta=None, pbar=None, log log_func=logger, ) tus_uploader.upload() + return tus_uploader.real_filename def _tus_start_upload(self, url, *, query_params=None): response = self._client.api_client.rest_client.POST( @@ -273,17 +279,21 @@ def upload_file_and_wait( ): url = self._client.api_map.make_endpoint_url(endpoint.path, kwsub=url_params) params = {"format": format_name, "filename": filename.name} - self.upload_file( + response = self.upload_file( url, filename, pbar=pbar, query_params=params, meta={"filename": params["filename"]} ) + rq_id = json.loads(response.data).get("rq_id") + assert rq_id, "The rq_id was not found in the response" + params["rq_id"] = rq_id + self._wait_for_completion( url, success_status=201, positive_statuses=[202], status_check_period=status_check_period, query_params=params, - method="POST", + method="PUT", ) @@ -301,12 +311,17 @@ def upload_file_and_wait( ): url = self._client.api_map.make_endpoint_url(upload_endpoint.path, kwsub=url_params) params = {"format": format_name, "filename": filename.name} - self.upload_file( + response = self.upload_file( url, filename, pbar=pbar, query_params=params, meta={"filename": params["filename"]} ) + rq_id = json.loads(response.data).get("rq_id") + assert rq_id, "The rq_id was not found in the response" url = self._client.api_map.make_endpoint_url(retrieve_endpoint.path, kwsub=url_params) - params = {"action": "import_status"} + params = { + "action": "import_status", + "rq_id": rq_id, + } self._wait_for_completion( url, success_status=201, diff --git a/cvat/apps/dataset_manager/project.py b/cvat/apps/dataset_manager/project.py index e52fb2ebab88..bf621fff7c9a 100644 --- a/cvat/apps/dataset_manager/project.py +++ b/cvat/apps/dataset_manager/project.py @@ -7,6 +7,7 @@ from tempfile import TemporaryDirectory import rq from typing import Any, Callable, List, Mapping, Tuple +from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError from django.db import transaction @@ -16,7 +17,7 @@ from cvat.apps.dataset_manager.task import TaskAnnotation from .annotation import AnnotationIR -from .bindings import ProjectData, load_dataset_data +from .bindings import ProjectData, load_dataset_data, CvatImportError from .formats.registry import make_exporter, make_importer def export_project(project_id, dst_file, format_name, @@ -160,7 +161,7 @@ def data(self) -> dict: raise NotImplementedError() @transaction.atomic -def import_dataset_as_project(project_id, dataset_file, format_name, conv_mask_to_poly): +def import_dataset_as_project(src_file, project_id, format_name, conv_mask_to_poly): rq_job = rq.get_current_job() rq_job.meta['status'] = 'Dataset import has been started...' rq_job.meta['progress'] = 0. @@ -170,5 +171,8 @@ def import_dataset_as_project(project_id, dataset_file, format_name, conv_mask_t project.init_from_db() importer = make_importer(format_name) - with open(dataset_file, 'rb') as f: - project.import_dataset(f, importer, conv_mask_to_poly=conv_mask_to_poly) + with open(src_file, 'rb') as f: + try: + project.import_dataset(f, importer, conv_mask_to_poly=conv_mask_to_poly) + except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex: + raise CvatImportError(str(ex)) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index c596db9920de..452a93505f90 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -8,6 +8,7 @@ from copy import deepcopy from enum import Enum from tempfile import TemporaryDirectory +from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError from django.db import transaction from django.db.models.query import Prefetch @@ -19,11 +20,10 @@ from cvat.apps.profiler import silk_profile from .annotation import AnnotationIR, AnnotationManager -from .bindings import JobData, TaskData +from .bindings import JobData, TaskData, CvatImportError from .formats.registry import make_exporter, make_importer from .util import bulk_create - class dotdict(OrderedDict): """dot.notation access to dictionary attributes""" __getattr__ = OrderedDict.get @@ -853,19 +853,25 @@ def export_task(task_id, dst_file, format_name, server_url=None, save_images=Fal task.export(f, exporter, host=server_url, save_images=save_images) @transaction.atomic -def import_task_annotations(task_id, src_file, format_name, conv_mask_to_poly): +def import_task_annotations(src_file, task_id, format_name, conv_mask_to_poly): task = TaskAnnotation(task_id) task.init_from_db() importer = make_importer(format_name) with open(src_file, 'rb') as f: - task.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly) + try: + task.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly) + except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex: + raise CvatImportError(str(ex)) @transaction.atomic -def import_job_annotations(job_id, src_file, format_name, conv_mask_to_poly): +def import_job_annotations(src_file, job_id, format_name, conv_mask_to_poly): job = JobAnnotation(job_id) job.init_from_db() importer = make_importer(format_name) with open(src_file, 'rb') as f: - job.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly) + try: + job.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly) + except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex: + raise CvatImportError(str(ex)) diff --git a/cvat/apps/dataset_manager/tests/test_formats.py b/cvat/apps/dataset_manager/tests/test_formats.py index 86ef91efc85b..e1fa84d874d3 100644 --- a/cvat/apps/dataset_manager/tests/test_formats.py +++ b/cvat/apps/dataset_manager/tests/test_formats.py @@ -923,8 +923,7 @@ def _test_can_import_annotations(self, task, import_format): expected_ann = TaskAnnotation(task["id"]) expected_ann.init_from_db() - dm.task.import_task_annotations(task["id"], - file_path, import_format, True) + dm.task.import_task_annotations(file_path, task["id"], import_format, True) actual_ann = TaskAnnotation(task["id"]) actual_ann.init_from_db() @@ -976,6 +975,6 @@ def test_can_import_mots_annotations_with_splited_masks(self): task.update() task = self._create_task(task, images) - dm.task.import_task_annotations(task['id'], dataset_path, format_name, True) + dm.task.import_task_annotations(dataset_path, task['id'], format_name, True) self._test_can_import_annotations(task, format_name) diff --git a/cvat/apps/dataset_manager/views.py b/cvat/apps/dataset_manager/views.py index ba133cc69953..5d652bf7285d 100644 --- a/cvat/apps/dataset_manager/views.py +++ b/cvat/apps/dataset_manager/views.py @@ -44,7 +44,6 @@ def get_export_cache_dir(db_instance): PROJECT_CACHE_TTL = DEFAULT_CACHE_TTL / 3 JOB_CACHE_TTL = DEFAULT_CACHE_TTL - def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False): try: if task_id is not None: diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index 3f519edede39..18c28b6420d2 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -33,8 +33,10 @@ from cvat.apps.engine.log import slogger from cvat.apps.engine.serializers import (AttributeSerializer, DataSerializer, LabelSerializer, LabeledDataSerializer, SegmentSerializer, SimpleJobSerializer, TaskReadSerializer, - ProjectReadSerializer, ProjectFileSerializer, TaskFileSerializer) -from cvat.apps.engine.utils import av_scan_paths, process_failed_job, configure_dependent_job, get_rq_job_meta + ProjectReadSerializer, ProjectFileSerializer, TaskFileSerializer, RqIdSerializer) +from cvat.apps.engine.utils import ( + av_scan_paths, process_failed_job, configure_dependent_job, get_rq_job_meta, get_import_rq_id, import_resource_with_clean_up_after +) from cvat.apps.engine.models import ( StorageChoice, StorageMethodChoice, DataChoice, Task, Project, Location) from cvat.apps.engine.task import JobFileMapping, _create_thread @@ -47,7 +49,6 @@ class Version(Enum): V1 = '1.0' - def _get_label_mapping(db_labels): label_mapping = {db_label.id: db_label.name for db_label in db_labels} for db_label in db_labels: @@ -869,7 +870,7 @@ def export(db_instance, request, queue_name): if os.path.exists(file_path): return Response(status=status.HTTP_201_CREATED) elif rq_job.is_failed: - exc_info = str(rq_job.exc_info) + exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info)) rq_job.delete() return Response(exc_info, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @@ -896,6 +897,9 @@ def _download_file_from_bucket(db_storage, filename, key): def _import(importer, request, queue, rq_id, Serializer, file_field_name, location_conf, filename=None): rq_job = queue.fetch_job(rq_id) + if (user_id_from_meta := getattr(rq_job, 'meta', {}).get('user', {}).get('id')) and user_id_from_meta != request.user.id: + return Response(status=status.HTTP_403_FORBIDDEN) + if not rq_job: org_id = getattr(request.iam_context['organization'], 'id', None) dependent_job = None @@ -939,22 +943,25 @@ def _import(importer, request, queue, rq_id, Serializer, file_field_name, locati filename=filename, key=key, request=request, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) rq_job = queue.enqueue_call( - func=importer, - args=(filename, request.user.id, org_id), + func=import_resource_with_clean_up_after, + args=(importer, filename, request.user.id, org_id), job_id=rq_id, meta={ 'tmp_file': filename, **get_rq_job_meta(request=request, db_obj=None) }, - depends_on=dependent_job + depends_on=dependent_job, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) else: if rq_job.is_finished: project_id = rq_job.return_value - os.remove(rq_job.meta['tmp_file']) rq_job.delete() return Response({'id': project_id}, status=status.HTTP_201_CREATED) elif rq_job.is_failed or \ @@ -971,7 +978,10 @@ def _import(importer, request, queue, rq_id, Serializer, file_field_name, locati return Response(data=exc_info, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - return Response({'rq_id': rq_id}, status=status.HTTP_202_ACCEPTED) + serializer = RqIdSerializer(data={'rq_id': rq_id}) + serializer.is_valid(raise_exception=True) + + return Response(serializer.data, status=status.HTTP_202_ACCEPTED) def get_backup_dirname(): return settings.TMP_FILES_ROOT @@ -980,7 +990,7 @@ def import_project(request, queue_name, filename=None): if 'rq_id' in request.data: rq_id = request.data['rq_id'] else: - rq_id = f"import:project.{uuid.uuid4()}-by-{request.user}" + rq_id = get_import_rq_id('project', uuid.uuid4(), 'backup', request.user) Serializer = ProjectFileSerializer file_field_name = 'project_file' @@ -1003,10 +1013,8 @@ def import_project(request, queue_name, filename=None): ) def import_task(request, queue_name, filename=None): - if 'rq_id' in request.data: - rq_id = request.data['rq_id'] - else: - rq_id = f"import:task.{uuid.uuid4()}-by-{request.user}" + rq_id = request.data.get('rq_id', get_import_rq_id('task', uuid.uuid4(), 'backup', request.user)) + Serializer = TaskFileSerializer file_field_name = 'task_file' diff --git a/cvat/apps/engine/handlers.py b/cvat/apps/engine/handlers.py new file mode 100644 index 000000000000..3253957dd3e0 --- /dev/null +++ b/cvat/apps/engine/handlers.py @@ -0,0 +1,22 @@ +# Copyright (C) 2023 CVAT.ai Corporation +# +# SPDX-License-Identifier: MIT + +from pathlib import Path +from time import time +from django.conf import settings +from cvat.apps.engine.log import slogger + + +def clear_import_cache(path: Path, creation_time: float) -> None: + """ + This function checks and removes the import files if they have not been removed from rq import jobs. + This means that for some reason file was uploaded to CVAT server but rq import job was not created. + + Args: + path (Path): path to file + creation_time (float): file creation time + """ + if path.is_file() and (time() - creation_time + 1) >= settings.IMPORT_CACHE_CLEAN_DELAY.total_seconds(): + path.unlink() + slogger.glob.warning(f"The file {str(path)} was removed from cleaning job.") diff --git a/cvat/apps/engine/location.py b/cvat/apps/engine/location.py index e463370bca6c..d2dc669f86ef 100644 --- a/cvat/apps/engine/location.py +++ b/cvat/apps/engine/location.py @@ -5,7 +5,7 @@ from enum import Enum from typing import Any, Dict -from cvat.apps.engine.models import Location +from cvat.apps.engine.models import Location, Job class StorageType(str, Enum): TARGET = 'target_storage' @@ -20,7 +20,7 @@ def get_location_configuration(obj, field_name: str, use_settings: bool = False) } if use_settings: - storage = getattr(obj, field_name) + storage = getattr(obj, field_name) if not isinstance(obj, Job) else getattr(obj.segment.task, field_name) if storage is None: location_conf['location'] = Location.LOCAL else: diff --git a/cvat/apps/engine/mixins.py b/cvat/apps/engine/mixins.py index 405dce1f5330..1906da6c9900 100644 --- a/cvat/apps/engine/mixins.py +++ b/cvat/apps/engine/mixins.py @@ -9,15 +9,21 @@ import uuid from dataclasses import asdict, dataclass from distutils.util import strtobool +from pathlib import Path +from tempfile import NamedTemporaryFile from unittest import mock +import django_rq from django.conf import settings from rest_framework import mixins, status from rest_framework.response import Response from cvat.apps.engine.location import StorageType, get_location_configuration +from cvat.apps.engine.log import slogger from cvat.apps.engine.models import Location from cvat.apps.engine.serializers import DataSerializer +from cvat.apps.engine.handlers import clear_import_cache +from cvat.apps.engine.utils import get_import_rq_id class TusFile: @@ -221,7 +227,27 @@ def init_tus_upload(self, request): if message_id: metadata["message_id"] = base64.b64decode(message_id) - file_exists = os.path.lexists(os.path.join(self.get_upload_dir(), filename)) + import_type = request.path.strip('/').split('/')[-1] + if import_type == 'backup': + # we need to create unique temp file here because + # users can try to import backups with the same name at the same time + with NamedTemporaryFile(prefix=f'cvat-backup-{filename}-by-{request.user}', suffix='.zip', dir=self.get_upload_dir()) as tmp_file: + filename = os.path.relpath(tmp_file.name, self.get_upload_dir()) + metadata['filename'] = filename + file_path = os.path.join(self.get_upload_dir(), filename) + file_exists = os.path.lexists(file_path) and import_type != 'backup' + + if file_exists: + # check whether the rq_job is in progress or has been finished/failed + object_class_name = self._object.__class__.__name__.lower() + template = get_import_rq_id(object_class_name, self._object.pk, import_type, request.user) + queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value) + finished_job_ids = queue.finished_job_registry.get_job_ids() + failed_job_ids = queue.failed_job_registry.get_job_ids() + if template in finished_job_ids or template in failed_job_ids: + os.remove(file_path) + file_exists = False + if file_exists: return self._tus_response(status=status.HTTP_409_CONFLICT, data="File with same name already exists") @@ -231,11 +257,27 @@ def init_tus_upload(self, request): return self._tus_response(status=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, data="File size exceeds max limit of {} bytes".format(self._tus_max_file_size)) + tus_file = TusFile.create_file(metadata, file_size, self.get_upload_dir()) location = request.build_absolute_uri() if 'HTTP_X_FORWARDED_HOST' not in request.META: location = request.META.get('HTTP_ORIGIN') + request.META.get('PATH_INFO') + + if import_type in ('backup', 'annotations', 'datasets'): + scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.CLEANING.value) + path = Path(self.get_upload_dir()) / tus_file.filename + cleaning_job = scheduler.enqueue_in(time_delta=settings.IMPORT_CACHE_CLEAN_DELAY, + func=clear_import_cache, + path=path, + creation_time=Path(tus_file.file_path).stat().st_ctime + ) + slogger.glob.info( + f'The cleaning job {cleaning_job.id} is queued.' + f'The check that the file {path} is deleted will be carried out after ' + f'{settings.IMPORT_CACHE_CLEAN_DELAY}.' + ) + return self._tus_response( status=status.HTTP_201_CREATED, extra_headers={'Location': '{}{}'.format(location, tus_file.file_id), @@ -330,7 +372,7 @@ def export_annotations(self, request, db_obj, export_func, callback, get_data=No data = get_data(self._object.pk) return Response(data) - def import_annotations(self, request, db_obj, import_func, rq_func, rq_id): + def import_annotations(self, request, db_obj, import_func, rq_func, rq_id_template): is_tus_request = request.headers.get('Upload-Length', None) is not None or \ request.method == 'OPTIONS' if is_tus_request: @@ -352,7 +394,7 @@ def import_annotations(self, request, db_obj, import_func, rq_func, rq_id): return import_func( request=request, - rq_id=rq_id, + rq_id_template=rq_id_template, rq_func=rq_func, db_obj=self._object, format_name=format_name, diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index 3fefec17b434..1d85679960f0 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -22,12 +22,10 @@ from cvat.apps.engine import models from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status from cvat.apps.engine.log import slogger -from cvat.apps.engine.utils import parse_specific_attributes +from cvat.apps.engine.utils import parse_specific_attributes, build_field_filter_params, get_list_view_name, reverse from drf_spectacular.utils import OpenApiExample, extend_schema_field, extend_schema_serializer -from cvat.apps.engine.utils import build_field_filter_params, get_list_view_name, reverse - class WriteOnceMixin: """ @@ -667,6 +665,9 @@ class RqStatusSerializer(serializers.Serializer): message = serializers.CharField(allow_blank=True, default="") progress = serializers.FloatField(max_value=100, default=0) +class RqIdSerializer(serializers.Serializer): + rq_id = serializers.CharField() + class JobFiles(serializers.ListField): """ diff --git a/cvat/apps/engine/tests/test_rest_api.py b/cvat/apps/engine/tests/test_rest_api.py index ef91c34441dc..da49ba70d698 100644 --- a/cvat/apps/engine/tests/test_rest_api.py +++ b/cvat/apps/engine/tests/test_rest_api.py @@ -1836,9 +1836,9 @@ def _run_api_v2_projects_id_dataset_import(self, pid, user, data, f): response = self.client.post("/api/projects/{}/dataset?format={}".format(pid, f), data=data, format="multipart") return response - def _run_api_v2_projects_id_dataset_import_status(self, pid, user): + def _run_api_v2_projects_id_dataset_import_status(self, pid, user, rq_id): with ForceLogin(user, self.client): - response = self.client.get("/api/projects/{}/dataset?action=import_status".format(pid), format="json") + response = self.client.get("/api/projects/{}/dataset?action=import_status&rq_id={}".format(pid, rq_id), format="json") return response def test_api_v2_projects_id_export_import(self): @@ -1867,7 +1867,8 @@ def test_api_v2_projects_id_export_import(self): response = self._run_api_v2_projects_id_dataset_import(pid_import, self.owner, import_data, "CVAT 1.1") self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) - response = self._run_api_v2_projects_id_dataset_import_status(pid_import, self.owner) + rq_id = response.data.get('rq_id') + response = self._run_api_v2_projects_id_dataset_import_status(pid_import, self.owner, rq_id) self.assertEqual(response.status_code, status.HTTP_201_CREATED) def tearDown(self): diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index c1a5eedc84c2..af1ee77bc9e8 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -9,17 +9,21 @@ import importlib import sys import traceback -from typing import Any, Dict, Optional +from contextlib import suppress +from typing import Any, Dict, Optional, Callable, Union import subprocess import os import urllib.parse +import logging import platform + +from rq.job import Job +from django_rq.queues import DjangoRQ from pathlib import Path from django.http.request import HttpRequest from django.utils import timezone from django.utils.http import urlencode - from rest_framework.reverse import reverse as _reverse from av import VideoFrame @@ -133,17 +137,29 @@ def parse_exception_message(msg): pass return parsed_msg -def process_failed_job(rq_job): - if os.path.exists(rq_job.meta['tmp_file']): - os.remove(rq_job.meta['tmp_file']) - exc_info = str(rq_job.exc_info or rq_job.dependency.exc_info) +def process_failed_job(rq_job: Job): + exc_info = str(rq_job.exc_info or getattr(rq_job.dependency, 'exc_info', None) or '') if rq_job.dependency: rq_job.dependency.delete() rq_job.delete() - return parse_exception_message(exc_info) - -def configure_dependent_job(queue, rq_id, rq_func, db_storage, filename, key, request): + msg = parse_exception_message(exc_info) + log = logging.getLogger('cvat.server.engine') + log.error(msg) + return msg + + +def configure_dependent_job( + queue: DjangoRQ, + rq_id: str, + rq_func: Callable[[Any, str, str], None], + db_storage: Any, + filename: str, + key: str, + request: HttpRequest, + result_ttl: float, + failure_ttl: float +) -> Job: rq_job_id_download_file = rq_id + f'?action=download_{filename}' rq_job_download_file = queue.fetch_job(rq_job_id_download_file) if not rq_job_download_file: @@ -153,6 +169,8 @@ def configure_dependent_job(queue, rq_id, rq_func, db_storage, filename, key, re args=(db_storage, filename, key), job_id=rq_job_id_download_file, meta=get_rq_job_meta(request=request, db_obj=db_storage), + result_ttl=result_ttl, + failure_ttl=failure_ttl ) return rq_job_download_file @@ -218,6 +236,27 @@ def get_list_view_name(model): 'model_name': model._meta.object_name.lower() } +def get_import_rq_id( + resource_type: str, + resource_id: int, + subresource_type: str, + user: str, +) -> str: + # import:---by- + return f"import:{resource_type}-{resource_id}-{subresource_type}-by-{user}" + +def import_resource_with_clean_up_after( + func: Union[Callable[[str, int, int], int], Callable[[str, int, str, bool], None]], + filename: str, + *args, + **kwargs, +) -> Any: + try: + result = func(filename, *args, **kwargs) + finally: + with suppress(FileNotFoundError): + os.remove(filename) + return result def get_cpu_number() -> int: cpu_number = None diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index d8c356943652..527ebae4eb20 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -8,6 +8,7 @@ import os.path as osp import pytz import traceback +import textwrap from datetime import datetime from distutils.util import strtobool from tempfile import NamedTemporaryFile @@ -62,12 +63,12 @@ UserSerializer, PluginsSerializer, IssueReadSerializer, IssueWriteSerializer, CommentReadSerializer, CommentWriteSerializer, CloudStorageWriteSerializer, CloudStorageReadSerializer, DatasetFileSerializer, - ProjectFileSerializer, TaskFileSerializer, CloudStorageContentSerializer) + ProjectFileSerializer, TaskFileSerializer, RqIdSerializer, CloudStorageContentSerializer) from cvat.apps.engine.view_utils import get_cloud_storage_for_import_or_export from utils.dataset_manifest import ImageManifestManager from cvat.apps.engine.utils import ( - av_scan_paths, process_failed_job, configure_dependent_job, parse_exception_message, get_rq_job_meta + av_scan_paths, process_failed_job, configure_dependent_job, parse_exception_message, get_rq_job_meta, get_import_rq_id, import_resource_with_clean_up_after ) from cvat.apps.engine import backup from cvat.apps.engine.mixins import PartialUpdateModelMixin, UploadMixin, AnnotationMixin, SerializeMixin @@ -249,6 +250,7 @@ class ProjectViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, ordering = "-id" lookup_fields = {'owner': 'owner__username', 'assignee': 'assignee__username'} iam_organization_field = 'organization' + IMPORT_RQ_ID_TEMPLATE = get_import_rq_id('project', {}, 'dataset', {}) def get_serializer_class(self): if self.request.method in SAFE_METHODS: @@ -271,6 +273,14 @@ def perform_create(self, serializer, **kwargs): ) @extend_schema(methods=['GET'], summary='Export project as a dataset in a specific format', + description=textwrap.dedent(""" + To check the status of the process of importing a project dataset from a file: + + After initiating the dataset upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + GET /api/projects/id/dataset requests to track the status of the dataset import. + Also you should specify action parameter: action=import_status. + """), parameters=[ OpenApiParameter('format', description='Desired output format name\n' 'You can get the list of supported formats at:\n/server/annotation/formats', @@ -287,6 +297,8 @@ def perform_create(self, serializer, **kwargs): OpenApiParameter('use_default_location', description='Use the location that was configured in project to import dataset', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL, required=False, default=True), + OpenApiParameter('rq_id', description='rq id', + location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), ], responses={ '200': OpenApiResponse(OpenApiTypes.BINARY, description='Download of file started'), @@ -294,7 +306,13 @@ def perform_create(self, serializer, **kwargs): '202': OpenApiResponse(description='Exporting has been started'), '405': OpenApiResponse(description='Format is not available'), }) - @extend_schema(methods=['POST'], summary='Import dataset in specific format as a project', + @extend_schema(methods=['POST'], + summary='Import dataset in specific format as a project or check status of dataset import process', + description=textwrap.dedent(""" + The request POST /api/projects/id/dataset will initiate file upload and will create + the rq job on the server in which the process of dataset import from a file + will be carried out. Please, use the GET /api/projects/id/dataset endpoint for checking status of the process. + """), parameters=[ OpenApiParameter('format', description='Desired dataset format name\n' 'You can get the list of supported formats at:\n/server/annotation/formats', @@ -315,7 +333,7 @@ def perform_create(self, serializer, **kwargs): resource_type_field_name=None ), responses={ - '202': OpenApiResponse(description='Importing has been started'), + '202': OpenApiResponse(RqIdSerializer, description='Importing has been started'), '400': OpenApiResponse(description='Failed to import dataset'), '405': OpenApiResponse(description='Format is not available'), }) @@ -323,7 +341,6 @@ def perform_create(self, serializer, **kwargs): url_path=r'dataset/?$', parser_classes=_UPLOAD_PARSER_CLASSES) def dataset(self, request, pk): self._object = self.get_object() # force call of check_object_permissions() - rq_id = f"import:dataset-for-project.id{pk}-by-{request.user}" if request.method in {'POST', 'OPTIONS'}: return self.import_annotations( @@ -331,17 +348,26 @@ def dataset(self, request, pk): db_obj=self._object, import_func=_import_project_dataset, rq_func=dm.project.import_dataset_as_project, - rq_id=rq_id, + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE ) else: action = request.query_params.get("action", "").lower() if action in ("import_status",): queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value) + rq_id = request.query_params.get('rq_id') + if not rq_id: + return Response('The rq_id param should be specified in the query parameters', status=status.HTTP_400_BAD_REQUEST) + + # check that the user has access to the current rq_job + # We should not return any status of job including "404 not found" for user that has no access for this rq_job + + if self.IMPORT_RQ_ID_TEMPLATE.format(pk, request.user) != rq_id: + return Response(status=status.HTTP_403_FORBIDDEN) + rq_job = queue.fetch_job(rq_id) if rq_job is None: return Response(status=status.HTTP_404_NOT_FOUND) elif rq_job.is_finished: - os.remove(rq_job.meta['tmp_file']) if rq_job.dependency: rq_job.dependency.delete() rq_job.delete() @@ -394,7 +420,7 @@ def upload_finished(self, request): return _import_project_dataset( request=request, filename=uploaded_file, - rq_id=f"import:dataset-for-project.id{self._object.pk}-by-{request.user}", + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE, rq_func=dm.project.import_dataset_as_project, db_obj=self._object, format_name=format_name, @@ -483,7 +509,19 @@ def annotations(self, request, pk): def export_backup(self, request, pk=None): return self.serialize(request, backup.export) - @extend_schema(summary='Methods create a project from a backup', + @extend_schema(methods=['POST'], summary='Methods create a project from a backup', + description=textwrap.dedent(""" + The backup import process is as follows: + + The first request POST /api/projects/backup will initiate file upload and will create + the rq job on the server in which the process of a project creating from an uploaded backup + will be carried out. + + After initiating the backup upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent requests + to track the status of the project creation. + Once the project has been successfully created, the server will return the id of the newly created project. + """), parameters=[ *ORGANIZATION_OPEN_API_PARAMETERS, OpenApiParameter('location', description='Where to import the backup file from', @@ -493,14 +531,20 @@ def export_backup(self, request, pk=None): location=OpenApiParameter.QUERY, type=OpenApiTypes.NUMBER, required=False), OpenApiParameter('filename', description='Backup file name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), + OpenApiParameter('rq_id', description='rq id', + location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), ], request=PolymorphicProxySerializer('BackupWrite', serializers=[ProjectFileSerializer, OpenApiTypes.NONE], resource_type_field_name=None ), + # TODO: for some reason the code generated by the openapi generator from schema with different serializers + # contains only one serializer, need to fix that. + # https://github.com/OpenAPITools/openapi-generator/issues/6126 responses={ - '201': OpenApiResponse(description='The project has been imported'), # or better specify {id: project_id} - '202': OpenApiResponse(description='Importing a backup file has been started'), + # 201: OpenApiResponse(inline_serializer("ImportedProjectIdSerializer", fields={"id": serializers.IntegerField(required=True)}) + '201': OpenApiResponse(description='The project has been imported'), + '202': OpenApiResponse(RqIdSerializer, description='Importing a backup file has been started'), }) @action(detail=False, methods=['OPTIONS', 'POST'], url_path=r'backup/?$', serializer_class=ProjectFileSerializer(required=False), @@ -699,6 +743,7 @@ class TaskViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, ordering_fields = list(filter_fields) ordering = "-id" iam_organization_field = 'organization' + IMPORT_RQ_ID_TEMPLATE = get_import_rq_id('task', {}, 'annotations', {}) def get_serializer_class(self): if self.request.method in SAFE_METHODS: @@ -716,6 +761,18 @@ def get_queryset(self): return queryset @extend_schema(summary='Method recreates a task from an attached task backup file', + description=textwrap.dedent(""" + The backup import process is as follows: + + The first request POST /api/tasks/backup will initiate file upload and will create + the rq job on the server in which the process of a task creating from an uploaded backup + will be carried out. + + After initiating the backup upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent requests + to track the status of the task creation. + Once the task has been successfully created, the server will return the id of the newly created task. + """), parameters=[ *ORGANIZATION_OPEN_API_PARAMETERS, OpenApiParameter('location', description='Where to import the backup file from', @@ -725,12 +782,19 @@ def get_queryset(self): location=OpenApiParameter.QUERY, type=OpenApiTypes.NUMBER, required=False), OpenApiParameter('filename', description='Backup file name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), + OpenApiParameter('rq_id', description='rq id', + location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), ], request=TaskFileSerializer(required=False), + # TODO: for some reason the code generated by the openapi generator from schema with different serializers + # contains only one serializer, need to fix that. + # https://github.com/OpenAPITools/openapi-generator/issues/6126 responses={ - '201': OpenApiResponse(description='The task has been imported'), # or better specify {id: task_id} - '202': OpenApiResponse(description='Importing a backup file has been started'), + # 201: OpenApiResponse(inline_serializer("ImportedTaskIdSerializer", fields={"id": serializers.IntegerField(required=True)}) + '201': OpenApiResponse(description='The task has been imported'), + '202': OpenApiResponse(RqIdSerializer, description='Importing a backup file has been started'), }) + @action(detail=False, methods=['OPTIONS', 'POST'], url_path=r'backup/?$', serializer_class=TaskFileSerializer(required=False), parser_classes=_UPLOAD_PARSER_CLASSES) @@ -810,8 +874,7 @@ def upload_finished(self, request): return _import_annotations( request=request, filename=annotation_file, - rq_id=(f"import:annotations-for-task.id{self._object.pk}-" - f"in-{format_name.replace(' ', '_')}-by-{request.user}"), + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE, rq_func=dm.task.import_task_annotations, db_obj=self._object, format_name=format_name, @@ -962,10 +1025,19 @@ def append_data_chunk(self, request, pk, file_id): '400': OpenApiResponse(description='Exporting without data is not allowed'), '405': OpenApiResponse(description='Format is not available'), }) - @extend_schema(methods=['PUT'], summary='Method allows to upload task annotations', + @extend_schema(methods=['PUT'], summary='Method allows to upload task annotations or edit existing annotations', + description=textwrap.dedent(""" + To check the status of the process of uploading a task annotations from a file: + + After initiating the annotations upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + PUT /api/tasks/id/annotations requests to track the status of the annotations upload. + """), parameters=[ OpenApiParameter('format', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, description='Input format name\nYou can get the list of supported formats at:\n/server/annotation/formats'), + OpenApiParameter('rq_id', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, + description='rq id'), ], request=PolymorphicProxySerializer('TaskAnnotationsUpdate', serializers=[LabeledDataSerializer, AnnotationFileSerializer, OpenApiTypes.NONE], @@ -977,7 +1049,12 @@ def append_data_chunk(self, request, pk, file_id): '405': OpenApiResponse(description='Format is not available'), }) @extend_schema(methods=['POST'], - summary="Method allows to upload task annotations from a local file or a cloud storage", + summary="Method allows to initialize the process of upload task annotations from a local or a cloud storage file", + description=textwrap.dedent(""" + The request POST /api/tasks/id/annotations will initiate file upload and will create + the rq job on the server in which the process of annotations uploading from file + will be carried out. Please, use the PUT /api/tasks/id/annotations endpoint for checking status of the process. + """), parameters=[ OpenApiParameter('format', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, description='Input format name\nYou can get the list of supported formats at:\n/server/annotation/formats'), @@ -998,7 +1075,7 @@ def append_data_chunk(self, request, pk, file_id): ), responses={ '201': OpenApiResponse(description='Uploading has finished'), - '202': OpenApiResponse(description='Uploading has been started'), + '202': OpenApiResponse(RqIdSerializer, description='Uploading has been started'), '405': OpenApiResponse(description='Format is not available'), }) @extend_schema(methods=['PATCH'], summary='Method performs a partial update of annotations in a specific task', @@ -1031,17 +1108,19 @@ def annotations(self, request, pk): return Response(data="Exporting annotations from a task without data is not allowed", status=status.HTTP_400_BAD_REQUEST) elif request.method == 'POST' or request.method == 'OPTIONS': + # NOTE: initialization process of annotations import format_name = request.query_params.get('format', '') return self.import_annotations( request=request, db_obj=self._object, import_func=_import_annotations, rq_func=dm.task.import_task_annotations, - rq_id = f"import:annotations-for-task.id{pk}-in-{format_name.replace(' ', '_')}-by-{request.user}" + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE ) elif request.method == 'PUT': format_name = request.query_params.get('format', '') if format_name: + # NOTE: continue process of import annotations use_settings = strtobool(str(request.query_params.get('use_default_location', True))) conv_mask_to_poly = strtobool(request.query_params.get('conv_mask_to_poly', 'True')) obj = self._object if use_settings else request.query_params @@ -1050,7 +1129,7 @@ def annotations(self, request, pk): ) return _import_annotations( request=request, - rq_id = f"import:annotations-for-task.id{pk}-in-{format_name.replace(' ', '_')}-by-{request.user}", + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE, rq_func=dm.task.import_task_annotations, db_obj=self._object, format_name=format_name, @@ -1274,6 +1353,7 @@ class JobViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, 'project_name': 'segment__task__project__name', 'assignee': 'assignee__username' } + IMPORT_RQ_ID_TEMPLATE = get_import_rq_id('job', {}, 'annotations', {}) def get_queryset(self): queryset = super().get_queryset() @@ -1292,24 +1372,21 @@ def get_serializer_class(self): # UploadMixin method def get_upload_dir(self): - task = self._object.segment.task - return task.get_tmp_dirname() + return self._object.get_tmp_dirname() # UploadMixin method def upload_finished(self, request): - task = self._object.segment.task if self.action == 'annotations': format_name = request.query_params.get("format", "") filename = request.query_params.get("filename", "") conv_mask_to_poly = strtobool(request.query_params.get('conv_mask_to_poly', 'True')) - tmp_dir = task.get_tmp_dirname() + tmp_dir = self.get_upload_dir() if os.path.isfile(os.path.join(tmp_dir, filename)): annotation_file = os.path.join(tmp_dir, filename) return _import_annotations( request=request, filename=annotation_file, - rq_id=(f"import:annotations-for-job.id{self._object.pk}-" - f"in-{format_name.replace(' ', '_')}-by-{request.user}"), + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE, rq_func=dm.task.import_job_annotations, db_obj=self._object, format_name=format_name, @@ -1352,7 +1429,13 @@ def upload_finished(self, request): '202': OpenApiResponse(description='Exporting has been started'), '405': OpenApiResponse(description='Format is not available'), }) - @extend_schema(methods=['POST'], summary='Method allows to upload job annotations', + @extend_schema(methods=['POST'], + summary='Method allows to initialize the process of the job annotation upload from a local file or a cloud storage', + description=textwrap.dedent(""" + The request POST /api/jobs/id/annotations will initiate file upload and will create + the rq job on the server in which the process of annotations uploading from file + will be carried out. Please, use the PUT /api/jobs/id/annotations endpoint for checking status of the process. + """), parameters=[ OpenApiParameter('format', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, description='Input format name\nYou can get the list of supported formats at:\n/server/annotation/formats'), @@ -1370,13 +1453,24 @@ def upload_finished(self, request): request=AnnotationFileSerializer, responses={ '201': OpenApiResponse(description='Uploading has finished'), - '202': OpenApiResponse(description='Uploading has been started'), + '202': OpenApiResponse(RqIdSerializer, description='Uploading has been started'), '405': OpenApiResponse(description='Format is not available'), }) - @extend_schema(methods=['PUT'], summary='Method performs an update of all annotations in a specific job', + @extend_schema(methods=['PUT'], + summary='Method performs an update of all annotations in a specific job ' + 'or used for uploading annotations from a file', + description=textwrap.dedent(""" + To check the status of the process of uploading a job annotations from a file: + + After initiating the annotations upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + PUT /api/jobs/id/annotations requests to track the status of the annotations upload. + """), parameters=[ OpenApiParameter('format', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, description='Input format name\nYou can get the list of supported formats at:\n/server/annotation/formats'), + OpenApiParameter('rq_id', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, + description='rq id'), ], request=PolymorphicProxySerializer( component_name='JobAnnotationsUpdate', @@ -1418,11 +1512,10 @@ def annotations(self, request, pk): format_name = request.query_params.get('format', '') return self.import_annotations( request=request, - db_obj=self._object.segment.task, + db_obj=self._object, import_func=_import_annotations, rq_func=dm.task.import_job_annotations, - rq_id=(f"import:annotations-for-job.id{self._object.pk}-" - f"in-{format_name.replace(' ', '_')}-by-{request.user}"), + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE ) elif request.method == 'PUT': @@ -1436,8 +1529,7 @@ def annotations(self, request, pk): ) return _import_annotations( request=request, - rq_id=(f"import:annotations-for-job.id{pk}-" - f"in-{format_name.replace(' ', '_')}-by-{request.user}"), + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE, rq_func=dm.task.import_job_annotations, db_obj=self._object, format_name=format_name, @@ -2302,9 +2394,9 @@ def actions(self, request, pk): return HttpResponseBadRequest(msg) def rq_exception_handler(rq_job, exc_type, exc_value, tb): - rq_job.exc_info = "".join( + rq_job.meta["formatted_exception"] = "".join( traceback.format_exception_only(exc_type, exc_value)) - rq_job.save() + rq_job.save_meta() return True @@ -2315,8 +2407,9 @@ def _download_file_from_bucket(db_storage, filename, key): with open(filename, 'wb+') as f: f.write(data.getbuffer()) -def _import_annotations(request, rq_id, rq_func, db_obj, format_name, +def _import_annotations(request, rq_id_template, rq_func, db_obj, format_name, filename=None, location_conf=None, conv_mask_to_poly=True): + format_desc = {f.DISPLAY_NAME: f for f in dm.views.get_import_formats()}.get(format_name) if format_desc is None: @@ -2325,9 +2418,25 @@ def _import_annotations(request, rq_id, rq_func, db_obj, format_name, elif not format_desc.ENABLED: return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED) + rq_id = request.query_params.get('rq_id') + rq_id_should_be_checked = bool(rq_id) + if not rq_id: + rq_id = rq_id_template.format(db_obj.pk, request.user) + queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value) rq_job = queue.fetch_job(rq_id) + if rq_id_should_be_checked and rq_id_template.format(db_obj.pk, request.user) != rq_id: + return Response(status=status.HTTP_403_FORBIDDEN) + + if rq_job and request.method == 'POST': + # If there is a previous job that has not been deleted + if rq_job.is_finished or rq_job.is_failed: + rq_job.delete() + rq_job = queue.fetch_job(rq_id) + else: + return Response(status=status.HTTP_409_CONFLICT, data='Import job already exists') + if not rq_job: # If filename is specified we consider that file was uploaded via TUS, so it exists in filesystem # Then we dont need to create temporary file @@ -2375,6 +2484,8 @@ def _import_annotations(request, rq_id, rq_func, db_obj, format_name, filename=filename, key=key, request=request, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) av_scan_paths(filename) @@ -2382,15 +2493,20 @@ def _import_annotations(request, rq_id, rq_func, db_obj, format_name, 'tmp_file': filename, } rq_job = queue.enqueue_call( - func=rq_func, - args=(db_obj.pk, filename, format_name, conv_mask_to_poly), + func=import_resource_with_clean_up_after, + args=(rq_func, filename, db_obj.pk, format_name, conv_mask_to_poly), job_id=rq_id, depends_on=dependent_job, - meta={**meta, **get_rq_job_meta(request=request, db_obj=db_obj)} + meta={**meta, **get_rq_job_meta(request=request, db_obj=db_obj)}, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) + serializer = RqIdSerializer(data={'rq_id': rq_id}) + serializer.is_valid(raise_exception=True) + + return Response(serializer.data, status=status.HTTP_202_ACCEPTED) else: if rq_job.is_finished: - os.remove(rq_job.meta['tmp_file']) rq_job.delete() return Response(status=status.HTTP_201_CREATED) elif rq_job.is_failed or \ @@ -2399,9 +2515,8 @@ def _import_annotations(request, rq_id, rq_func, db_obj, format_name, # RQ adds a prefix with exception class name import_error_prefix = '{}.{}'.format( CvatImportError.__module__, CvatImportError.__name__) - if exc_info.startswith(import_error_prefix): - exc_info = exc_info.replace(import_error_prefix + ': ', '') - return Response(data=exc_info, + if import_error_prefix in exc_info: + return Response(data="The annotations that were uploaded are not correct", status=status.HTTP_400_BAD_REQUEST) else: return Response(data=exc_info, @@ -2481,7 +2596,7 @@ def _export_annotations(db_instance, rq_id, request, format_name, action, callba if osp.exists(file_path): return Response(status=status.HTTP_201_CREATED) elif rq_job.is_failed: - exc_info = str(rq_job.exc_info) + exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info)) rq_job.delete() return Response(exc_info, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @@ -2509,7 +2624,7 @@ def _export_annotations(db_instance, rq_id, request, format_name, action, callba result_ttl=ttl, failure_ttl=ttl) return Response(status=status.HTTP_202_ACCEPTED) -def _import_project_dataset(request, rq_id, rq_func, db_obj, format_name, filename=None, conv_mask_to_poly=True, location_conf=None): +def _import_project_dataset(request, rq_id_template, rq_func, db_obj, format_name, filename=None, conv_mask_to_poly=True, location_conf=None): format_desc = {f.DISPLAY_NAME: f for f in dm.views.get_import_formats()}.get(format_name) if format_desc is None: @@ -2518,10 +2633,17 @@ def _import_project_dataset(request, rq_id, rq_func, db_obj, format_name, filena elif not format_desc.ENABLED: return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED) + rq_id = rq_id_template.format(db_obj.pk, request.user) + queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value) rq_job = queue.fetch_job(rq_id) - if not rq_job: + if not rq_job or rq_job.is_finished or rq_job.is_failed: + if rq_job and (rq_job.is_finished or rq_job.is_failed): + # for some reason the previous job has not been deleted + # (e.g the user closed the browser tab when job has been created + # but no one requests for checking status were not made) + rq_job.delete() dependent_job = None location = location_conf.get('location') if location_conf else None if not filename and location != Location.CLOUD_STORAGE: @@ -2563,19 +2685,26 @@ def _import_project_dataset(request, rq_id, rq_func, db_obj, format_name, filena filename=filename, key=key, request=request, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) rq_job = queue.enqueue_call( - func=rq_func, - args=(db_obj.pk, filename, format_name, conv_mask_to_poly), + func=import_resource_with_clean_up_after, + args=(rq_func, filename, db_obj.pk, format_name, conv_mask_to_poly), job_id=rq_id, meta={ 'tmp_file': filename, **get_rq_job_meta(request=request, db_obj=db_obj), }, depends_on=dependent_job, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) else: return Response(status=status.HTTP_409_CONFLICT, data='Import job already exists') - return Response(status=status.HTTP_202_ACCEPTED) + serializer = RqIdSerializer(data={'rq_id': rq_id}) + serializer.is_valid(raise_exception=True) + + return Response(serializer.data, status=status.HTTP_202_ACCEPTED) diff --git a/cvat/apps/events/export.py b/cvat/apps/events/export.py index ef464b5b7da9..90aca64db8be 100644 --- a/cvat/apps/events/export.py +++ b/cvat/apps/events/export.py @@ -148,7 +148,7 @@ def export(request, filter_query, queue_name): if os.path.exists(file_path): return Response(status=status.HTTP_201_CREATED) elif rq_job.is_failed: - exc_info = str(rq_job.exc_info) + exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info)) rq_job.delete() return Response(exc_info, status=status.HTTP_500_INTERNAL_SERVER_ERROR) diff --git a/cvat/requirements/base.in b/cvat/requirements/base.in index 933548869798..7c773219336f 100644 --- a/cvat/requirements/base.in +++ b/cvat/requirements/base.in @@ -50,7 +50,7 @@ diskcache==5.4.0 boto3==1.17.61 azure-storage-blob==12.13.0 google-cloud-storage==1.42.0 -git+https://github.com/cvat-ai/datumaro.git@0817144ade1ddc514e182ca1835e322cb9af00a0 +git+https://github.com/cvat-ai/datumaro.git@ff83c00c2c1bc4b8fdfcc55067fcab0a9b5b6b11 urllib3>=1.26.5 # not directly required, pinned by Snyk to avoid a vulnerability natsort==8.0.0 mistune>=2.0.1 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/cvat/requirements/base.txt b/cvat/requirements/base.txt index d95148f72979..5df59babe6d1 100644 --- a/cvat/requirements/base.txt +++ b/cvat/requirements/base.txt @@ -1,4 +1,4 @@ -# SHA1:53feeaa402abed516aad4a640244c5fd1bff765a +# SHA1:d1435558d66ec49d0c691492b2f3798960ca3bba # # This file is autogenerated by pip-compile-multi # To update, run: @@ -66,7 +66,7 @@ cryptography==40.0.2 # pyjwt cycler==0.11.0 # via matplotlib -datumaro @ git+https://github.com/cvat-ai/datumaro.git@0817144ade1ddc514e182ca1835e322cb9af00a0 +datumaro @ git+https://github.com/cvat-ai/datumaro.git@ff83c00c2c1bc4b8fdfcc55067fcab0a9b5b6b11 # via -r cvat/requirements/base.in defusedxml==0.7.1 # via diff --git a/cvat/schema.yml b/cvat/schema.yml index b7d2997b2c58..97a1f1cc6666 100644 --- a/cvat/schema.yml +++ b/cvat/schema.yml @@ -1681,7 +1681,13 @@ paths: description: Format is not available post: operationId: jobs_create_annotations - summary: Method allows to upload job annotations + description: |2 + + The request POST /api/jobs/id/annotations will initiate file upload and will create + the rq job on the server in which the process of annotations uploading from file + will be carried out. Please, use the PUT /api/jobs/id/annotations endpoint for checking status of the process. + summary: Method allows to initialize the process of the job annotation upload + from a local file or a cloud storage parameters: - in: query name: cloud_storage_id @@ -1742,12 +1748,24 @@ paths: '201': description: Uploading has finished '202': + content: + application/vnd.cvat+json: + schema: + $ref: '#/components/schemas/RqId' description: Uploading has been started '405': description: Format is not available put: operationId: jobs_update_annotations - summary: Method performs an update of all annotations in a specific job + description: |2 + + To check the status of the process of uploading a job annotations from a file: + + After initiating the annotations upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + PUT /api/jobs/id/annotations requests to track the status of the annotations upload. + summary: Method performs an update of all annotations in a specific job or used + for uploading annotations from a file parameters: - in: query name: format @@ -1763,6 +1781,11 @@ paths: type: integer description: A unique integer value identifying this job. required: true + - in: query + name: rq_id + schema: + type: string + description: rq id tags: - jobs requestBody: @@ -3059,6 +3082,14 @@ paths: /api/projects/{id}/dataset/: get: operationId: projects_retrieve_dataset + description: |2 + + To check the status of the process of importing a project dataset from a file: + + After initiating the dataset upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + GET /api/projects/id/dataset requests to track the status of the dataset import. + Also you should specify action parameter: action=import_status. summary: Export project as a dataset in a specific format parameters: - in: query @@ -3102,6 +3133,11 @@ paths: - cloud_storage - local description: Where need to save downloaded dataset + - in: query + name: rq_id + schema: + type: string + description: rq id - in: query name: use_default_location schema: @@ -3132,7 +3168,13 @@ paths: description: Format is not available post: operationId: projects_create_dataset - summary: Import dataset in specific format as a project + description: |2 + + The request POST /api/projects/id/dataset will initiate file upload and will create + the rq job on the server in which the process of dataset import from a file + will be carried out. Please, use the GET /api/projects/id/dataset endpoint for checking status of the process. + summary: Import dataset in specific format as a project or check status of dataset + import process parameters: - in: query name: cloud_storage_id @@ -3191,6 +3233,10 @@ paths: - basicAuth: [] responses: '202': + content: + application/vnd.cvat+json: + schema: + $ref: '#/components/schemas/RqId' description: Importing has been started '400': description: Failed to import dataset @@ -3223,6 +3269,18 @@ paths: /api/projects/backup/: post: operationId: projects_create_backup + description: |2 + + The backup import process is as follows: + + The first request POST /api/projects/backup will initiate file upload and will create + the rq job on the server in which the process of a project creating from an uploaded backup + will be carried out. + + After initiating the backup upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent requests + to track the status of the project creation. + Once the project has been successfully created, the server will return the id of the newly created project. summary: Methods create a project from a backup parameters: - in: header @@ -3259,6 +3317,11 @@ paths: schema: type: integer description: Organization identifier + - in: query + name: rq_id + schema: + type: string + description: rq id tags: - projects requestBody: @@ -3279,6 +3342,10 @@ paths: '201': description: The project has been imported '202': + content: + application/vnd.cvat+json: + schema: + $ref: '#/components/schemas/RqId' description: Importing a backup file has been started /api/schema/: get: @@ -3835,8 +3902,13 @@ paths: description: Format is not available post: operationId: tasks_create_annotations - summary: Method allows to upload task annotations from a local file or a cloud - storage + description: |2 + + The request POST /api/tasks/id/annotations will initiate file upload and will create + the rq job on the server in which the process of annotations uploading from file + will be carried out. Please, use the PUT /api/tasks/id/annotations endpoint for checking status of the process. + summary: Method allows to initialize the process of upload task annotations + from a local or a cloud storage file parameters: - in: query name: cloud_storage_id @@ -3896,12 +3968,23 @@ paths: '201': description: Uploading has finished '202': + content: + application/vnd.cvat+json: + schema: + $ref: '#/components/schemas/RqId' description: Uploading has been started '405': description: Format is not available put: operationId: tasks_update_annotations - summary: Method allows to upload task annotations + description: |2 + + To check the status of the process of uploading a task annotations from a file: + + After initiating the annotations upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + PUT /api/tasks/id/annotations requests to track the status of the annotations upload. + summary: Method allows to upload task annotations or edit existing annotations parameters: - in: query name: format @@ -3917,6 +4000,11 @@ paths: type: integer description: A unique integer value identifying this task. required: true + - in: query + name: rq_id + schema: + type: string + description: rq id tags: - tasks requestBody: @@ -4341,6 +4429,18 @@ paths: /api/tasks/backup/: post: operationId: tasks_create_backup + description: |2 + + The backup import process is as follows: + + The first request POST /api/tasks/backup will initiate file upload and will create + the rq job on the server in which the process of a task creating from an uploaded backup + will be carried out. + + After initiating the backup upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent requests + to track the status of the task creation. + Once the task has been successfully created, the server will return the id of the newly created task. summary: Method recreates a task from an attached task backup file parameters: - in: header @@ -4377,6 +4477,11 @@ paths: schema: type: integer description: Organization identifier + - in: query + name: rq_id + schema: + type: string + description: rq id tags: - tasks requestBody: @@ -4398,6 +4503,10 @@ paths: '201': description: The task has been imported '202': + content: + application/vnd.cvat+json: + schema: + $ref: '#/components/schemas/RqId' description: Importing a backup file has been started /api/users: get: @@ -7339,6 +7448,13 @@ components: * `supervisor` - Supervisor * `maintainer` - Maintainer * `owner` - Owner + RqId: + type: object + properties: + rq_id: + type: string + required: + - rq_id RqStatus: type: object properties: diff --git a/cvat/settings/base.py b/cvat/settings/base.py index 8c24978a9220..4b04ea8d79e0 100644 --- a/cvat/settings/base.py +++ b/cvat/settings/base.py @@ -21,6 +21,7 @@ import shutil import subprocess import sys +from datetime import timedelta from distutils.util import strtobool from enum import Enum @@ -294,6 +295,7 @@ class CVAT_QUEUES(Enum): AUTO_ANNOTATION = 'annotation' WEBHOOKS = 'webhooks' NOTIFICATIONS = 'notifications' + CLEANING = 'cleaning' RQ_QUEUES = { CVAT_QUEUES.IMPORT_DATA.value: { @@ -326,6 +328,12 @@ class CVAT_QUEUES(Enum): 'DB': 0, 'DEFAULT_TIMEOUT': '1h' }, + CVAT_QUEUES.CLEANING.value: { + 'HOST': 'localhost', + 'PORT': 6379, + 'DB': 0, + 'DEFAULT_TIMEOUT': '1h' + }, } NUCLIO = { @@ -346,7 +354,6 @@ class CVAT_QUEUES(Enum): 'cvat.apps.events.handlers.handle_rq_exception', ] - # JavaScript and CSS compression # https://django-compressor.readthedocs.io @@ -667,3 +674,7 @@ class CVAT_QUEUES(Enum): } BUCKET_CONTENT_MAX_PAGE_SIZE = 500 + +IMPORT_CACHE_FAILED_TTL = timedelta(days=90) +IMPORT_CACHE_SUCCESS_TTL = timedelta(hours=1) +IMPORT_CACHE_CLEAN_DELAY = timedelta(hours=2) diff --git a/cvat/settings/testing_rest.py b/cvat/settings/testing_rest.py index 36bf80dc0a2b..5fb329732f29 100644 --- a/cvat/settings/testing_rest.py +++ b/cvat/settings/testing_rest.py @@ -10,3 +10,5 @@ PASSWORD_HASHERS = [ "django.contrib.auth.hashers.MD5PasswordHasher", ] + +IMPORT_CACHE_CLEAN_DELAY = timedelta(seconds=30) diff --git a/docker-compose.yml b/docker-compose.yml index dc46e4a320e0..a91b002e7e4d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -77,6 +77,7 @@ services: DJANGO_LOG_SERVER_HOST: vector DJANGO_LOG_SERVER_PORT: 80 no_proxy: clickhouse,grafana,vector,nuclio,opa,${no_proxy:-} + NUMPROCS: 1 command: -c supervisord/utils.conf volumes: - cvat_data:/home/django/data diff --git a/supervisord/utils.conf b/supervisord/utils.conf index 2e53b6f1de33..925ada37324f 100644 --- a/supervisord/utils.conf +++ b/supervisord/utils.conf @@ -41,3 +41,12 @@ command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -i " environment=VECTOR_EVENT_HANDLER="SynchronousLogstashHandler" numprocs=1 + +[program:rqworker_cleaning] +command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -ic " \ + exec python3 %(ENV_HOME)s/manage.py rqworker -v 3 cleaning \ + --worker-class cvat.rqworker.DefaultWorker \ + " +environment=SSH_AUTH_SOCK="/tmp/ssh-agent.sock",VECTOR_EVENT_HANDLER="SynchronousLogstashHandler" +numprocs=%(ENV_NUMPROCS)s +process_name=rqworker_cleaning_%(process_num)s \ No newline at end of file diff --git a/tests/python/pytest.ini b/tests/python/pytest.ini index 653ae999256e..05cda52273da 100644 --- a/tests/python/pytest.ini +++ b/tests/python/pytest.ini @@ -8,4 +8,3 @@ timeout = 15 markers = with_external_services: The test requires services extrernal to the default CVAT deployment, e.g. a Git server etc. - diff --git a/tests/python/rest_api/test_projects.py b/tests/python/rest_api/test_projects.py index 9c335aa74513..40def2386124 100644 --- a/tests/python/rest_api/test_projects.py +++ b/tests/python/rest_api/test_projects.py @@ -473,11 +473,13 @@ def _test_import_project(self, username, project_id, format_name, data): _content_type="multipart/form-data", ) assert response.status == HTTPStatus.ACCEPTED + rq_id = json.loads(response.data).get("rq_id") + assert rq_id, "The rq_id was not found in the response" while True: # TODO: It's better be refactored to a separate endpoint to get request status (_, response) = api_client.projects_api.retrieve_dataset( - project_id, action="import_status" + project_id, action="import_status", rq_id=rq_id ) if response.status == HTTPStatus.CREATED: break diff --git a/tests/python/rest_api/test_tasks.py b/tests/python/rest_api/test_tasks.py index 77c1baaac04c..59e118a91a9b 100644 --- a/tests/python/rest_api/test_tasks.py +++ b/tests/python/rest_api/test_tasks.py @@ -11,8 +11,10 @@ from functools import partial from http import HTTPStatus from itertools import chain, product +from math import ceil from pathlib import Path -from tempfile import TemporaryDirectory +from tempfile import NamedTemporaryFile, TemporaryDirectory +from time import sleep, time from typing import List, Optional import pytest @@ -21,10 +23,12 @@ from cvat_sdk.api_client.api_client import ApiClient, Endpoint from cvat_sdk.core.helpers import get_paginated_collection from cvat_sdk.core.proxies.tasks import ResourceType, Task +from cvat_sdk.core.uploading import Uploader from deepdiff import DeepDiff from PIL import Image import shared.utils.s3 as s3 +from shared.fixtures.init import docker_exec_cvat, kube_exec_cvat from shared.utils.config import ( BASE_URL, USER_PASS, @@ -1726,3 +1730,110 @@ def test_can_report_correct_completed_jobs_count(tasks, jobs, admin_user): task, _ = api_client.tasks_api.retrieve(task["id"]) assert task.jobs.completed == 1 + + +class TestImportTaskAnnotations: + def _make_client(self) -> Client: + return Client(BASE_URL, config=Config(status_check_period=0.01)) + + @pytest.fixture(autouse=True) + def setup(self, restore_db_per_function, tmp_path: Path, admin_user: str): + self.tmp_dir = tmp_path + self.client = self._make_client() + self.user = admin_user + self.format = "COCO 1.0" + + with self.client: + self.client.login((self.user, USER_PASS)) + + def _check_annotations(self, task_id): + with make_api_client(self.user) as api_client: + (_, response) = api_client.tasks_api.retrieve_annotations(id=task_id) + assert response.status == HTTPStatus.OK + annotations = json.loads(response.data)["shapes"] + assert len(annotations) > 0 + + def _delete_annotations(self, task_id): + with make_api_client(self.user) as api_client: + (_, response) = api_client.tasks_api.destroy_annotations(id=task_id) + assert response.status == HTTPStatus.NO_CONTENT + + @pytest.mark.timeout(64) + @pytest.mark.parametrize("successful_upload", [True, False]) + def test_can_import_annotations_after_previous_unclear_import( + self, successful_upload: bool, tasks_with_shapes + ): + task_id = tasks_with_shapes[0]["id"] + self._check_annotations(task_id) + + with NamedTemporaryFile() as f: + filename = self.tmp_dir / f"task_{task_id}_{Path(f.name).name}_coco.zip" + + task = self.client.tasks.retrieve(task_id) + task.export_dataset(self.format, filename, include_images=False) + + self._delete_annotations(task_id) + + params = {"format": self.format, "filename": filename.name} + url = self.client.api_map.make_endpoint_url( + self.client.api_client.tasks_api.create_annotations_endpoint.path + ).format(id=task_id) + uploader = Uploader(self.client) + + if successful_upload: + # define time required to upload file with annotations + start_time = time() + task.import_annotations(self.format, filename) + required_time = ceil(time() - start_time) * 2 + self._delete_annotations(task_id) + + response = uploader.upload_file( + url, filename, meta=params, query_params=params, logger=self.client.logger.debug + ) + rq_id = json.loads(response.data)["rq_id"] + assert rq_id + else: + required_time = 54 + uploader._tus_start_upload(url, query_params=params) + uploader._upload_file_data_with_tus( + url, filename, meta=params, logger=self.client.logger.debug + ) + + sleep(required_time) + if successful_upload: + self._check_annotations(task_id) + self._delete_annotations(task_id) + task.import_annotations(self.format, filename) + self._check_annotations(task_id) + + @pytest.mark.timeout(64) + def test_check_import_cache_after_previous_interrupted_upload(self, tasks_with_shapes, request): + task_id = tasks_with_shapes[0]["id"] + with NamedTemporaryFile() as f: + filename = self.tmp_dir / f"task_{task_id}_{Path(f.name).name}_coco.zip" + task = self.client.tasks.retrieve(task_id) + task.export_dataset(self.format, filename, include_images=False) + + params = {"format": self.format, "filename": filename.name} + url = self.client.api_map.make_endpoint_url( + self.client.api_client.tasks_api.create_annotations_endpoint.path + ).format(id=task_id) + + uploader = Uploader(self.client) + uploader._tus_start_upload(url, query_params=params) + uploader._upload_file_data_with_tus( + url, filename, meta=params, logger=self.client.logger.debug + ) + number_of_files = 1 + sleep(30) # wait when the cleaning job from rq worker will be started + command = ["/bin/bash", "-c", f"ls data/tasks/{task_id}/tmp | wc -l"] + platform = request.config.getoption("--platform") + assert platform in ("kube", "local") + func = docker_exec_cvat if platform == "local" else kube_exec_cvat + for _ in range(12): + sleep(2) + result, _ = func(command) + number_of_files = int(result) + if not number_of_files: + break + assert not number_of_files diff --git a/tests/python/sdk/test_tasks.py b/tests/python/sdk/test_tasks.py index 3a8faeddef45..59069a4f80ce 100644 --- a/tests/python/sdk/test_tasks.py +++ b/tests/python/sdk/test_tasks.py @@ -367,7 +367,7 @@ def _test_can_create_from_backup(self, fxt_new_task: Task, fxt_backup_file: Path assert task.id assert task.id != fxt_new_task.id assert task.size == fxt_new_task.size - assert "imported sucessfully" in self.logger_stream.getvalue() + assert "imported successfully" in self.logger_stream.getvalue() assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1] assert self.stdout.getvalue() == "" diff --git a/tests/python/shared/fixtures/init.py b/tests/python/shared/fixtures/init.py index c2fd87671521..792fe36825f2 100644 --- a/tests/python/shared/fixtures/init.py +++ b/tests/python/shared/fixtures/init.py @@ -8,6 +8,7 @@ from pathlib import Path from subprocess import PIPE, CalledProcessError, run from time import sleep +from typing import List, Union import pytest import requests @@ -23,7 +24,6 @@ CONTAINER_NAME_FILES = ["docker-compose.tests.yml"] - DC_FILES = [ "docker-compose.dev.yml", "tests/docker-compose.file_share.yml", @@ -85,7 +85,7 @@ def _run(command, capture_output=True): proc = run(_command, check=True) # nosec return stdout, stderr except CalledProcessError as exc: - stderr = exc.stderr.decode() if capture_output else "see above" + stderr = exc.stderr.decode() or exc.stdout.decode() if capture_output else "see above" pytest.exit( f"Command failed: {command}.\n" f"Error message: {stderr}.\n" @@ -120,13 +120,17 @@ def kube_cp(source, target): _run(f"kubectl cp {source} {target}") -def docker_exec_cvat(command): - _run(f"docker exec {PREFIX}_cvat_server_1 {command}") +def docker_exec_cvat(command: Union[List[str], str]): + base = f"docker exec {PREFIX}_cvat_server_1" + _command = f"{base} {command}" if isinstance(command, str) else base.split() + command + return _run(_command) -def kube_exec_cvat(command): +def kube_exec_cvat(command: Union[List[str], str]): pod_name = _kube_get_server_pod_name() - _run(f"kubectl exec {pod_name} -- {command}") + base = f"kubectl exec {pod_name} --" + _command = f"{base} {command}" if isinstance(command, str) else base.split() + command + return _run(_command) def docker_exec_cvat_db(command): @@ -211,7 +215,7 @@ def create_compose_files(container_name_files): for service_name, service_config in dc_config["services"].items(): service_config.pop("container_name", None) - if service_name == "cvat_server": + if service_name in ("cvat_server", "cvat_utils"): service_env = service_config["environment"] service_env["DJANGO_SETTINGS_MODULE"] = "cvat.settings.testing_rest" diff --git a/tests/python/shared/utils/config.py b/tests/python/shared/utils/config.py index f5a3206c5aff..0e9669fce5d0 100644 --- a/tests/python/shared/utils/config.py +++ b/tests/python/shared/utils/config.py @@ -58,6 +58,10 @@ def post_files_method(username, endpoint, data, files, **kwargs): ) +def put_method(username, endpoint, data, **kwargs): + return requests.put(get_api_url(endpoint, **kwargs), json=data, auth=(username, USER_PASS)) + + def server_get(username, endpoint, **kwargs): return requests.get(get_server_url(endpoint, **kwargs), auth=(username, USER_PASS)) diff --git a/tests/python/shared/utils/resource_import_export.py b/tests/python/shared/utils/resource_import_export.py index 5adf8aecf7c4..9ee8bdec26e7 100644 --- a/tests/python/shared/utils/resource_import_export.py +++ b/tests/python/shared/utils/resource_import_export.py @@ -9,7 +9,7 @@ T = TypeVar("T") -from shared.utils.config import get_method, post_method +from shared.utils.config import get_method, post_method, put_method FILENAME_TEMPLATE = "cvat/{}/{}.zip" EXPORT_FORMAT = "CVAT for images 1.1" @@ -117,9 +117,16 @@ def _import_annotations_from_cloud_storage( response = post_method(user, url, data=None, **kwargs) status = response.status_code + # Only the first POST request contains rq_id in response. + # Exclude cases with 403 expected status. + rq_id = None + if status == HTTPStatus.ACCEPTED: + rq_id = response.json().get("rq_id") + assert rq_id, "The rq_id was not found in the response" + while status != _expect_status: assert status == HTTPStatus.ACCEPTED - response = post_method(user, url, data=None, **kwargs) + response = put_method(user, url, data=None, rq_id=rq_id, **kwargs) status = response.status_code if _check_uploaded: @@ -154,9 +161,16 @@ def _import_dataset_from_cloud_storage( response = post_method(user, url, data=None, **kwargs) status = response.status_code + # Only the first POST request contains rq_id in response. + # Exclude cases with 403 expected status. + rq_id = None + if status == HTTPStatus.ACCEPTED: + rq_id = response.json().get("rq_id") + assert rq_id, "The rq_id was not found in the response" + while status != _expect_status: assert status == HTTPStatus.ACCEPTED - response = get_method(user, url, action="import_status") + response = get_method(user, url, action="import_status", rq_id=rq_id) status = response.status_code def _import_resource(self, cloud_storage: Dict[str, Any], resource_type: str, *args, **kwargs): diff --git a/tests/values.test.yaml b/tests/values.test.yaml index a4d90fc12428..e281ecb32143 100644 --- a/tests/values.test.yaml +++ b/tests/values.test.yaml @@ -14,6 +14,10 @@ cvat: - mountPath: /home/django/share name: cvat-backend-data subPath: share + utils: + additionalEnv: + - name: DJANGO_SETTINGS_MODULE + value: cvat.settings.testing_rest # Images are already present in the node imagePullPolicy: Never frontend: