diff --git a/CHANGELOG.md b/CHANGELOG.md index d874854696a..337dc4f0d47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## \[2.1.0] - Unreleased ### Added +- Task annotations importing via chunk uploads () - Advanced filtration and sorting for a list of tasks/projects/cloudstorages () ### Changed diff --git a/cvat-core/src/server-proxy.js b/cvat-core/src/server-proxy.js index f4be5df17b0..f0a52b67ac3 100644 --- a/cvat-core/src/server-proxy.js +++ b/cvat-core/src/server-proxy.js @@ -45,6 +45,49 @@ }); } + async function chunkUpload(file, uploadConfig) { + const params = enableOrganization(); + const { + endpoint, chunkSize, totalSize, onUpdate, + } = uploadConfig; + let { totalSentSize } = uploadConfig; + return new Promise((resolve, reject) => { + const upload = new tus.Upload(file, { + endpoint, + metadata: { + filename: file.name, + filetype: file.type, + }, + headers: { + Authorization: Axios.defaults.headers.common.Authorization, + }, + chunkSize, + retryDelays: null, + onError(error) { + reject(error); + }, + onBeforeRequest(req) { + const xhr = req.getUnderlyingObject(); + const { org } = params; + req.setHeader('X-Organization', org); + xhr.withCredentials = true; + }, + onProgress(bytesUploaded) { + if (onUpdate && Number.isInteger(totalSentSize) && Number.isInteger(totalSize)) { + const currentUploadedSize = totalSentSize + bytesUploaded; + const percentage = currentUploadedSize / totalSize; + onUpdate(percentage); + } + }, + onSuccess() { + if (totalSentSize) totalSentSize += file.size; + resolve(totalSentSize); + }, + }); + upload.start(); + }); + } + function generateError(errorData) { if (errorData.response) { const message = `${errorData.message}. ${JSON.stringify(errorData.response.data) || ''}.`; @@ -816,42 +859,6 @@ onUpdate('The data are being uploaded to the server..', null); - async function chunkUpload(taskId, file) { - return new Promise((resolve, reject) => { - const upload = new tus.Upload(file, { - endpoint: `${origin}${backendAPI}/tasks/${taskId}/data/`, - metadata: { - filename: file.name, - filetype: file.type, - }, - headers: { - Authorization: `Token ${store.get('token')}`, - }, - chunkSize, - retryDelays: null, - onError(error) { - reject(error); - }, - onBeforeRequest(req) { - const xhr = req.getUnderlyingObject(); - const { org } = params; - req.setHeader('X-Organization', org); - xhr.withCredentials = true; - }, - onProgress(bytesUploaded) { - const currentUploadedSize = totalSentSize + bytesUploaded; - const percentage = currentUploadedSize / totalSize; - onUpdate('The data are being uploaded to the server', percentage); - }, - onSuccess() { - totalSentSize += file.size; - resolve(); - }, - }); - upload.start(); - }); - } - async function bulkUpload(taskId, files) { const fileBulks = files.reduce((fileGroups, file) => { const lastBulk = fileGroups[fileGroups.length - 1]; @@ -891,8 +898,17 @@ proxy: config.proxy, headers: { 'Upload-Start': true }, }); + const uploadConfig = { + endpoint: `${origin}${backendAPI}/tasks/${response.data.id}/data/`, + onUpdate: (percentage) => { + onUpdate('The data are being uploaded to the server', percentage); + }, + chunkSize, + totalSize, + totalSentSize, + }; for (const file of chunkFiles) { - await chunkUpload(response.data.id, file); + uploadConfig.totalSentSize += await chunkUpload(file, uploadConfig); } if (bulkFiles.length > 0) { await bulkUpload(response.data.id, bulkFiles); @@ -1215,38 +1231,57 @@ // Session is 'task' or 'job' async function uploadAnnotations(session, id, file, format) { - const { backendAPI } = config; + const { backendAPI, origin } = config; const params = { ...enableOrganization(), format, + filename: file.name, }; - let annotationData = new FormData(); - annotationData.append('annotation_file', file); - - return new Promise((resolve, reject) => { - async function request() { - try { - const response = await Axios.put( - `${backendAPI}/${session}s/${id}/annotations`, - annotationData, - { - params, - proxy: config.proxy, - }, - ); - if (response.status === 202) { - annotationData = new FormData(); - setTimeout(request, 3000); - } else { - resolve(); + const chunkSize = config.uploadChunkSize * 1024 * 1024; + const uploadConfig = { + chunkSize, + endpoint: `${origin}${backendAPI}/${session}s/${id}/annotations/`, + }; + try { + await Axios.post(`${backendAPI}/${session}s/${id}/annotations`, + new FormData(), { + params, + proxy: config.proxy, + headers: { 'Upload-Start': true }, + }); + await chunkUpload(file, uploadConfig); + await Axios.post(`${backendAPI}/${session}s/${id}/annotations`, + new FormData(), { + params, + proxy: config.proxy, + headers: { 'Upload-Finish': true }, + }); + return new Promise((resolve, reject) => { + async function requestStatus() { + try { + const response = await Axios.put( + `${backendAPI}/${session}s/${id}/annotations`, + new FormData(), + { + params, + proxy: config.proxy, + }, + ); + if (response.status === 202) { + setTimeout(requestStatus, 3000); + } else { + resolve(); + } + } catch (errorData) { + reject(generateError(errorData)); } - } catch (errorData) { - reject(generateError(errorData)); } - } - - setTimeout(request); - }); + setTimeout(requestStatus); + }); + } catch (errorData) { + generateError(errorData); + return null; + } } // Session is 'task' or 'job' diff --git a/cvat/apps/engine/mixins.py b/cvat/apps/engine/mixins.py index 9cb1a4808a2..d000fbe76ad 100644 --- a/cvat/apps/engine/mixins.py +++ b/cvat/apps/engine/mixins.py @@ -9,7 +9,6 @@ from django.conf import settings from django.core.cache import cache from rest_framework import status -from rest_framework.decorators import action from rest_framework.response import Response from cvat.apps.engine.serializers import DataSerializer @@ -26,6 +25,7 @@ def __init__(self, file_id, upload_dir): self.offset = cache.get("tus-uploads/{}/offset".format(file_id)) def init_file(self): + os.makedirs(self.upload_dir, exist_ok=True) file_path = os.path.join(self.upload_dir, self.file_id) with open(file_path, 'wb') as file: file.seek(self.file_size - 1) @@ -100,7 +100,7 @@ class UploadMixin(object): 'Access-Control-Allow-Headers': "Tus-Resumable,upload-length,upload-metadata,Location,Upload-Offset,content-type", 'Cache-Control': 'no-store' } - _file_id_regex = r'(?P\b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b)' + file_id_regex = r'(?P\b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b)' def _tus_response(self, status, data=None, extra_headers=None): response = Response(data, status) @@ -147,9 +147,6 @@ def init_tus_upload(self, request): if request.method == 'OPTIONS': return self._tus_response(status=status.HTTP_204) else: - if not self.can_upload(): - return self._tus_response(data='Adding more data is not allowed', - status=status.HTTP_400_BAD_REQUEST) metadata = self._get_metadata(request) filename = metadata.get('filename', '') if not self.validate_filename(filename): @@ -173,13 +170,14 @@ def init_tus_upload(self, request): tus_file = TusFile.create_file(metadata, file_size, self.get_upload_dir()) + location = request.build_absolute_uri() + if 'HTTP_X_FORWARDED_HOST' not in request.META: + location = request.META.get('HTTP_ORIGIN') + request.META.get('PATH_INFO') return self._tus_response( status=status.HTTP_201_CREATED, - extra_headers={'Location': '{}{}'.format(request.build_absolute_uri(), tus_file.file_id)}) + extra_headers={'Location': '{}{}'.format(location, tus_file.file_id)}) - @action(detail=True, methods=['HEAD', 'PATCH'], url_path=r'data/'+_file_id_regex) - def append_tus_chunk(self, request, pk, file_id): - self.get_object() # call check_object_permissions as well + def append_tus_chunk(self, request, file_id): if request.method == 'HEAD': tus_file = TusFile.get_tusfile(str(file_id), self.get_upload_dir()) if tus_file: @@ -211,26 +209,16 @@ def validate_filename(self, filename): file_path = os.path.join(upload_dir, filename) return os.path.commonprefix((os.path.realpath(file_path), upload_dir)) == upload_dir - def can_upload(self): - db_model = self.get_object() - model_data = db_model.data - return model_data.size == 0 - def get_upload_dir(self): - db_model = self.get_object() - return db_model.data.get_upload_dirname() + return self._object.data.get_upload_dirname() def get_request_client_files(self, request): - db_model = self.get_object() - serializer = DataSerializer(db_model, data=request.data) + serializer = DataSerializer(self._object, data=request.data) serializer.is_valid(raise_exception=True) data = {k: v for k, v in serializer.validated_data.items()} - return data.get('client_files', None); + return data.get('client_files', None) def append(self, request): - if not self.can_upload(): - return Response(data='Adding more data is not allowed', - status=status.HTTP_400_BAD_REQUEST) client_files = self.get_request_client_files(request) if client_files: upload_dir = self.get_upload_dir() diff --git a/cvat/apps/engine/models.py b/cvat/apps/engine/models.py index 3e8b0f2abfb..6501dbfdeab 100644 --- a/cvat/apps/engine/models.py +++ b/cvat/apps/engine/models.py @@ -305,6 +305,9 @@ def get_log_path(self): def get_task_artifacts_dirname(self): return os.path.join(self.get_task_dirname(), 'artifacts') + def get_tmp_dirname(self): + return os.path.join(self.get_task_dirname(), "tmp") + def __str__(self): return self.name diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 0c7ade2ffc5..ec9ca62207e 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -636,39 +636,67 @@ def jobs(self, request, pk): return Response(serializer.data) + # UploadMixin method + def get_upload_dir(self): + if 'annotations' in self.action: + return self._object.get_tmp_dirname() + elif 'data' in self.action: + return self._object.data.get_upload_dirname() + return "" + + # UploadMixin method def upload_finished(self, request): - db_task = self.get_object() # call check_object_permissions as well - task_data = db_task.data - serializer = DataSerializer(task_data, data=request.data) - serializer.is_valid(raise_exception=True) - data = dict(serializer.validated_data.items()) - uploaded_files = task_data.get_uploaded_files() - uploaded_files.extend(data.get('client_files')) - serializer.validated_data.update({'client_files': uploaded_files}) - - db_data = serializer.save() - db_task.data = db_data - db_task.save() - data = {k: v for k, v in serializer.data.items()} - - data['use_zip_chunks'] = serializer.validated_data['use_zip_chunks'] - data['use_cache'] = serializer.validated_data['use_cache'] - data['copy_data'] = serializer.validated_data['copy_data'] - if data['use_cache']: - db_task.data.storage_method = StorageMethodChoice.CACHE - db_task.data.save(update_fields=['storage_method']) - if data['server_files'] and not data.get('copy_data'): - db_task.data.storage = StorageChoice.SHARE - db_task.data.save(update_fields=['storage']) - if db_data.cloud_storage: - db_task.data.storage = StorageChoice.CLOUD_STORAGE - db_task.data.save(update_fields=['storage']) - # if the value of stop_frame is 0, then inside the function we cannot know - # the value specified by the user or it's default value from the database - if 'stop_frame' not in serializer.validated_data: - data['stop_frame'] = None - task.create(db_task.id, data) - return Response(serializer.data, status=status.HTTP_202_ACCEPTED) + if self.action == 'annotations': + format_name = request.query_params.get("format", "") + filename = request.query_params.get("filename", "") + tmp_dir = self._object.get_tmp_dirname() + if os.path.isfile(os.path.join(tmp_dir, filename)): + annotation_file = os.path.join(tmp_dir, filename) + return _import_annotations( + request=request, + filename=annotation_file, + rq_id="{}@/api/tasks/{}/annotations/upload".format(request.user, self._object.pk), + rq_func=dm.task.import_task_annotations, + pk=self._object.pk, + format_name=format_name, + ) + else: + return Response(data='No such file were uploaded', + status=status.HTTP_400_BAD_REQUEST) + elif self.action == 'data': + task_data = self._object.data + serializer = DataSerializer(task_data, data=request.data) + serializer.is_valid(raise_exception=True) + data = dict(serializer.validated_data.items()) + uploaded_files = task_data.get_uploaded_files() + uploaded_files.extend(data.get('client_files')) + serializer.validated_data.update({'client_files': uploaded_files}) + + db_data = serializer.save() + self._object.data = db_data + self._object.save() + data = {k: v for k, v in serializer.data.items()} + + data['use_zip_chunks'] = serializer.validated_data['use_zip_chunks'] + data['use_cache'] = serializer.validated_data['use_cache'] + data['copy_data'] = serializer.validated_data['copy_data'] + if data['use_cache']: + self._object.data.storage_method = StorageMethodChoice.CACHE + self._object.data.save(update_fields=['storage_method']) + if data['server_files'] and not data.get('copy_data'): + self._object.data.storage = StorageChoice.SHARE + self._object.data.save(update_fields=['storage']) + if db_data.cloud_storage: + self._object.data.storage = StorageChoice.CLOUD_STORAGE + self._object.data.save(update_fields=['storage']) + # if the value of stop_frame is 0, then inside the function we cannot know + # the value specified by the user or it's default value from the database + if 'stop_frame' not in serializer.validated_data: + data['stop_frame'] = None + task.create(self._object.id, data) + return Response(serializer.data, status=status.HTTP_202_ACCEPTED) + return Response(data='Unknown upload was finished', + status=status.HTTP_400_BAD_REQUEST) @extend_schema(methods=['POST'], summary='Method permanently attaches images or video to a task. Supports tus uploads, see more https://tus.io/', @@ -700,14 +728,14 @@ def upload_finished(self, request): }, tags=['tasks'], versions=['2.0']) @action(detail=True, methods=['OPTIONS', 'POST', 'GET'], url_path=r'data/?$') def data(self, request, pk): - db_task = self.get_object() # call check_object_permissions as well + self._object = self.get_object() # call check_object_permissions as well if request.method == 'POST' or request.method == 'OPTIONS': - task_data = db_task.data + task_data = self._object.data if not task_data: task_data = Data.objects.create() task_data.make_dirs() - db_task.data = task_data - db_task.save() + self._object.data = task_data + self._object.save() elif task_data.size != 0: return Response(data='Adding more data is not supported', status=status.HTTP_400_BAD_REQUEST) @@ -719,10 +747,15 @@ def data(self, request, pk): data_quality = request.query_params.get('quality', 'compressed') data_getter = DataChunkGetter(data_type, data_num, data_quality, - db_task.dimension) + self._object.dimension) - return data_getter(request, db_task.data.start_frame, - db_task.data.stop_frame, db_task.data) + return data_getter(request, self._object.data.start_frame, + self._object.data.stop_frame, self._object.data) + + @action(detail=True, methods=['HEAD', 'PATCH'], url_path='data/'+UploadMixin.file_id_regex) + def append_data_chunk(self, request, pk, file_id): + self._object = self.get_object() + return self.append_tus_chunk(request, file_id) @extend_schema(methods=['GET'], summary='Method allows to download task annotations', parameters=[ @@ -759,14 +792,14 @@ def data(self, request, pk): responses={ '204': OpenApiResponse(description='The annotation has been deleted'), }, tags=['tasks'], versions=['2.0']) - @action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH'], + @action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH', 'POST', 'OPTIONS'], url_path=r'annotations/?$', serializer_class=LabeledDataSerializer) def annotations(self, request, pk): - db_task = self.get_object() # force to call check_object_permissions + self._object = self.get_object() # force to call check_object_permissions if request.method == 'GET': format_name = request.query_params.get('format') if format_name: - return _export_annotations(db_instance=db_task, + return _export_annotations(db_instance=self._object, rq_id="/api/tasks/{}/annotations/{}".format(pk, format_name), request=request, action=request.query_params.get("action", "").lower(), @@ -779,6 +812,8 @@ def annotations(self, request, pk): serializer = LabeledDataSerializer(data=data) if serializer.is_valid(raise_exception=True): return Response(serializer.data) + elif request.method == 'POST' or request.method == 'OPTIONS': + return self.upload_data(request) elif request.method == 'PUT': format_name = request.query_params.get('format') if format_name: @@ -810,6 +845,11 @@ def annotations(self, request, pk): return Response(data=str(e), status=status.HTTP_400_BAD_REQUEST) return Response(data) + @action(detail=True, methods=['HEAD', 'PATCH'], url_path='annotations/'+UploadMixin.file_id_regex) + def append_annotations_chunk(self, request, pk, file_id): + self._object = self.get_object() + return self.append_tus_chunk(request, file_id) + @extend_schema( summary='When task is being created the method returns information about a status of the creation process', responses={ @@ -928,7 +968,7 @@ def dataset_export(self, request, pk): '200': JobWriteSerializer, }, tags=['jobs'], versions=['2.0'])) class JobViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, - mixins.RetrieveModelMixin, mixins.UpdateModelMixin): + mixins.RetrieveModelMixin, mixins.UpdateModelMixin, UploadMixin): queryset = Job.objects.all() iam_organization_field = 'segment__task__organization' search_fields = ('task_name', 'project_name', 'assignee', 'state', 'stage') @@ -960,6 +1000,34 @@ def get_serializer_class(self): else: return JobWriteSerializer + # UploadMixin method + def get_upload_dir(self): + task = self._object.segment.task + return task.get_tmp_dirname() + + # UploadMixin method + def upload_finished(self, request): + task = self._object.segment.task + if self.action == 'annotations': + format_name = request.query_params.get("format", "") + filename = request.query_params.get("filename", "") + tmp_dir = task.get_tmp_dirname() + if os.path.isfile(os.path.join(tmp_dir, filename)): + annotation_file = os.path.join(tmp_dir, filename) + return _import_annotations( + request=request, + filename=annotation_file, + rq_id="{}@/api/jobs/{}/annotations/upload".format(request.user, self._object.pk), + rq_func=dm.task.import_job_annotations, + pk=self._object.pk, + format_name=format_name, + ) + else: + return Response(data='No such file were uploaded', + status=status.HTTP_400_BAD_REQUEST) + return Response(data='Unknown upload was finished', + status=status.HTTP_400_BAD_REQUEST) + @extend_schema(methods=['GET'], summary='Method returns annotations for a specific job', responses={ '200': LabeledDataSerializer(many=True), @@ -983,13 +1051,15 @@ def get_serializer_class(self): responses={ '204': OpenApiResponse(description='The annotation has been deleted'), }, tags=['jobs'], versions=['2.0']) - @action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH'], + @action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH', 'POST', 'OPTIONS'], url_path=r'annotations/?$', serializer_class=LabeledDataSerializer) def annotations(self, request, pk): - self.get_object() # force to call check_object_permissions + self._object = self.get_object() # force to call check_object_permissions if request.method == 'GET': data = dm.task.get_job_data(pk) return Response(data) + elif request.method == 'POST' or request.method == 'OPTIONS': + return self.upload_data(request) elif request.method == 'PUT': format_name = request.query_params.get('format', '') if format_name: @@ -1024,6 +1094,11 @@ def annotations(self, request, pk): return Response(data=str(e), status=status.HTTP_400_BAD_REQUEST) return Response(data) + @action(detail=True, methods=['HEAD', 'PATCH'], url_path='annotations/'+UploadMixin.file_id_regex) + def append_annotations_chunk(self, request, pk, file_id): + self._object = self.get_object() + return self.append_tus_chunk(request, file_id) + @extend_schema( summary='Method returns list of issues for the job', responses={ @@ -1549,7 +1624,7 @@ def rq_handler(job, exc_type, exc_value, tb): return True -def _import_annotations(request, rq_id, rq_func, pk, format_name): +def _import_annotations(request, rq_id, rq_func, pk, format_name, filename=None): format_desc = {f.DISPLAY_NAME: f for f in dm.views.get_import_formats()}.get(format_name) if format_desc is None: @@ -1562,31 +1637,35 @@ def _import_annotations(request, rq_id, rq_func, pk, format_name): rq_job = queue.fetch_job(rq_id) if not rq_job: - serializer = AnnotationFileSerializer(data=request.data) - if serializer.is_valid(raise_exception=True): - anno_file = serializer.validated_data['annotation_file'] - fd, filename = mkstemp(prefix='cvat_{}'.format(pk)) - with open(filename, 'wb+') as f: - for chunk in anno_file.chunks(): - f.write(chunk) - - av_scan_paths(filename) - rq_job = queue.enqueue_call( - func=rq_func, - args=(pk, filename, format_name), - job_id=rq_id - ) - rq_job.meta['tmp_file'] = filename - rq_job.meta['tmp_file_descriptor'] = fd - rq_job.save_meta() + # If filename is specified we consider that file was uploaded via TUS, so it exists in filesystem + # Then we dont need to create temporary file + fd = None + if not filename: + serializer = AnnotationFileSerializer(data=request.data) + if serializer.is_valid(raise_exception=True): + anno_file = serializer.validated_data['annotation_file'] + fd, filename = mkstemp(prefix='cvat_{}'.format(pk)) + with open(filename, 'wb+') as f: + for chunk in anno_file.chunks(): + f.write(chunk) + + av_scan_paths(filename) + rq_job = queue.enqueue_call( + func=rq_func, + args=(pk, filename, format_name), + job_id=rq_id + ) + rq_job.meta['tmp_file'] = filename + rq_job.meta['tmp_file_descriptor'] = fd + rq_job.save_meta() else: if rq_job.is_finished: - os.close(rq_job.meta['tmp_file_descriptor']) + if rq_job.meta['tmp_file_descriptor']: os.close(rq_job.meta['tmp_file_descriptor']) os.remove(rq_job.meta['tmp_file']) rq_job.delete() return Response(status=status.HTTP_201_CREATED) elif rq_job.is_failed: - os.close(rq_job.meta['tmp_file_descriptor']) + if rq_job.meta['tmp_file_descriptor']: os.close(rq_job.meta['tmp_file_descriptor']) os.remove(rq_job.meta['tmp_file']) exc_info = str(rq_job.exc_info) rq_job.delete() diff --git a/cvat/settings/base.py b/cvat/settings/base.py index f7aa06b8c27..e81b0d2383a 100644 --- a/cvat/settings/base.py +++ b/cvat/settings/base.py @@ -487,6 +487,10 @@ def add_ssh_keys(): # How django uses X-Forwarded-Proto - https://docs.djangoproject.com/en/2.2/ref/settings/#secure-proxy-ssl-header SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https') +# Forwarded host - https://docs.djangoproject.com/en/4.0/ref/settings/#std:setting-USE_X_FORWARDED_HOST +# Is used in TUS uploads to provide correct upload endpoint +USE_X_FORWARDED_HOST = True + # Django-sendfile requires to set SENDFILE_ROOT # https://github.com/moggers87/django-sendfile2 SENDFILE_ROOT = BASE_DIR @@ -528,3 +532,4 @@ def add_ssh_keys(): # OTHER SETTINGS # https://drf-spectacular.readthedocs.io/en/latest/settings.html } + diff --git a/utils/cli/tests/test_cli.py b/utils/cli/tests/test_cli.py index 84698c042d5..a6022f4426b 100644 --- a/utils/cli/tests/test_cli.py +++ b/utils/cli/tests/test_cli.py @@ -59,7 +59,7 @@ def test_tasks_list(self): def test_tasks_delete(self): self.cli.tasks_delete([1]) self.cli.tasks_list(False) - self.assertNotRegex(self.mock_stdout.getvalue(), '.*{}.*'.format(self.taskname)) + self.assertRegex(self.mock_stdout.getvalue(), '.*Task ID {} deleted.*'.format(1)) def test_tasks_dump(self): path = os.path.join(settings.SHARE_ROOT, 'test_cli.xml')