From cbc59ef3dd0ec43056f332854164f9f6958d8711 Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Tue, 27 Dec 2022 22:16:30 +0200 Subject: [PATCH] Simplify upload data for task (#5498) It's possible to specify only the manifest file and filename pattern for creating task with cloud storage data. The special characters supported now for the pattern are `*`, `?`, `[seq]`, `[!seq]`. Please see [here](https://github.com/opencv/cvat/blob/8898a8b2647514dd6f3f6ce83745b1ca8ef72bce/tests/python/rest_api/test_tasks.py#L686) for some examples of how to use this functionality. Co-authored-by: Maxim Zhiltsov --- CHANGELOG.md | 2 +- cvat/apps/engine/serializers.py | 4 +- cvat/apps/engine/task.py | 130 +++++++++++++++++---------- tests/python/rest_api/test_tasks.py | 119 ++++++++++++++++++++++++ tests/python/shared/utils/helpers.py | 5 +- 5 files changed, 211 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d2fa0b2dca4..7a0fc6897d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## \[2.4.0] - Unreleased ### Added -- TDB +- Filename pattern to simplify uploading cloud storage data for a task () ### Changed - TDB diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index fd2448b5ad0..abb29bd36e3 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -371,12 +371,13 @@ class DataSerializer(WriteOnceMixin, serializers.ModelSerializer): use_cache = serializers.BooleanField(default=False) copy_data = serializers.BooleanField(default=False) cloud_storage_id = serializers.IntegerField(write_only=True, allow_null=True, required=False) + filename_pattern = serializers.CharField(allow_null=True, required=False) class Meta: model = models.Data fields = ('chunk_size', 'size', 'image_quality', 'start_frame', 'stop_frame', 'frame_filter', 'compressed_chunk_type', 'original_chunk_type', 'client_files', 'server_files', 'remote_files', 'use_zip_chunks', - 'cloud_storage_id', 'use_cache', 'copy_data', 'storage_method', 'storage', 'sorting_method') + 'cloud_storage_id', 'use_cache', 'copy_data', 'storage_method', 'storage', 'sorting_method', 'filename_pattern') # pylint: disable=no-self-use def validate_frame_filter(self, value): @@ -396,6 +397,7 @@ def validate(self, attrs): if 'start_frame' in attrs and 'stop_frame' in attrs \ and attrs['start_frame'] > attrs['stop_frame']: raise serializers.ValidationError('Stop frame must be more or equal start frame') + return attrs def create(self, validated_data): diff --git a/cvat/apps/engine/task.py b/cvat/apps/engine/task.py index b724830e8cc..a4bcd8d5c33 100644 --- a/cvat/apps/engine/task.py +++ b/cvat/apps/engine/task.py @@ -5,6 +5,7 @@ # SPDX-License-Identifier: MIT import itertools +import fnmatch import os import sys from rest_framework.serializers import ValidationError @@ -127,7 +128,7 @@ def _save_task_to_db(db_task, extractor): db_task.data.save() db_task.save() -def _count_files(data, manifest_files=None): +def _count_files(data): share_root = settings.SHARE_ROOT server_files = [] @@ -158,7 +159,7 @@ def count_files(file_mapping, counter): if mime in counter: counter[mime].append(rel_path) elif rel_path.endswith('.jsonl'): - manifest_files.append(rel_path) + continue else: slogger.glob.warn("Skip '{}' file (its mime type doesn't " "correspond to supported MIME file type)".format(full_path)) @@ -177,6 +178,12 @@ def count_files(file_mapping, counter): return counter +def _find_manifest_files(data): + manifest_files = [] + for files in ['client_files', 'server_files', 'remote_files']: + manifest_files.extend(list(filter(lambda x: x.endswith('.jsonl'), data[files]))) + return manifest_files + def _validate_data(counter, manifest_files=None): unique_entries = 0 multiple_entries = 0 @@ -207,10 +214,10 @@ def _validate_data(counter, manifest_files=None): return counter, task_modes[0] -def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage): +def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage, data_storage_method): if manifests: if len(manifests) != 1: - raise Exception('Only one manifest file can be attached with data') + raise ValidationError('Only one manifest file can be attached to data') manifest_file = manifests[0] full_manifest_path = os.path.join(root_dir, manifests[0]) if is_in_cloud: @@ -221,8 +228,10 @@ def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage): < cloud_storage_instance.get_file_last_modified(manifest_file): cloud_storage_instance.download_file(manifest_file, full_manifest_path) if is_manifest(full_manifest_path): + if not (settings.USE_CACHE or data_storage_method != models.StorageMethodChoice.CACHE): + raise ValidationError("Manifest file can be uploaded only if 'Use cache' option is also selected") return manifest_file - raise Exception('Invalid manifest was uploaded') + raise ValidationError('Invalid manifest was uploaded') return None def _validate_url(url): @@ -291,6 +300,26 @@ def _download_data(urls, upload_dir): def _get_manifest_frame_indexer(start_frame=0, frame_step=1): return lambda frame_id: start_frame + frame_id * frame_step +def _create_task_manifest_based_on_cloud_storage_manifest( + sorted_media, + cloud_storage_manifest_prefix, + cloud_storage_manifest, + manifest +): + if cloud_storage_manifest_prefix: + sorted_media_without_manifest_prefix = [ + os.path.relpath(i, cloud_storage_manifest_prefix) for i in sorted_media + ] + sequence, raw_content = cloud_storage_manifest.get_subset(sorted_media_without_manifest_prefix) + def _add_prefix(properties): + file_name = properties['name'] + properties['name'] = os.path.join(cloud_storage_manifest_prefix, file_name) + return properties + content = list(map(_add_prefix, raw_content)) + else: + sequence, content = cloud_storage_manifest.get_subset(sorted_media) + sorted_content = (i[1] for i in sorted(zip(sequence, content))) + manifest.create(sorted_content) @transaction.atomic def _create_thread(db_task, data, isBackupRestore=False, isDatasetImport=False): @@ -300,69 +329,80 @@ def _create_thread(db_task, data, isBackupRestore=False, isDatasetImport=False): slogger.glob.info("create task #{}".format(db_task.id)) db_data = db_task.data - upload_dir = db_data.get_upload_dirname() + upload_dir = db_data.get_upload_dirname() if db_data.storage != models.StorageChoice.SHARE else settings.SHARE_ROOT is_data_in_cloud = db_data.storage == models.StorageChoice.CLOUD_STORAGE if data['remote_files'] and not isDatasetImport: data['remote_files'] = _download_data(data['remote_files'], upload_dir) - manifest_files = [] - media = _count_files(data, manifest_files) - media, task_mode = _validate_data(media, manifest_files) - - if data['server_files']: - if db_data.storage == models.StorageChoice.LOCAL: - _copy_data_from_source(data['server_files'], upload_dir, data.get('server_files_path')) - elif db_data.storage == models.StorageChoice.SHARE: - upload_dir = settings.SHARE_ROOT - + # find and validate manifest file + manifest_files = _find_manifest_files(data) manifest_root = None - if db_data.storage in {models.StorageChoice.LOCAL, models.StorageChoice.SHARE}: + + # we should also handle this case because files from the share source have not been downloaded yet + if data['copy_data']: + manifest_root = settings.SHARE_ROOT + elif db_data.storage in {models.StorageChoice.LOCAL, models.StorageChoice.SHARE}: manifest_root = upload_dir elif is_data_in_cloud: manifest_root = db_data.cloud_storage.get_storage_dirname() manifest_file = _validate_manifest( manifest_files, manifest_root, - is_data_in_cloud, db_data.cloud_storage if is_data_in_cloud else None + is_data_in_cloud, db_data.cloud_storage if is_data_in_cloud else None, + db_data.storage_method, ) - if manifest_file and (not settings.USE_CACHE or db_data.storage_method != models.StorageMethodChoice.CACHE): - raise Exception("File with meta information can be uploaded if 'Use cache' option is also selected") - if data['server_files'] and is_data_in_cloud: + if is_data_in_cloud: cloud_storage_instance = db_storage_to_storage_instance(db_data.cloud_storage) - sorted_media = sort(media['image'], data['sorting_method']) - - data_size = len(sorted_media) - segment_step, *_ = _get_task_segment_data(db_task, data_size) - for start_frame in range(0, data_size, segment_step): - first_sorted_media_image = sorted_media[start_frame] - cloud_storage_instance.download_file(first_sorted_media_image, os.path.join(upload_dir, first_sorted_media_image)) - # prepare task manifest file from cloud storage manifest file - # NOTE we should create manifest before defining chunk_size - # FIXME in the future when will be implemented archive support manifest = ImageManifestManager(db_data.get_manifest_path()) cloud_storage_manifest = ImageManifestManager( os.path.join(db_data.cloud_storage.get_storage_dirname(), manifest_file), db_data.cloud_storage.get_storage_dirname() ) - cloud_storage_manifest_prefix = os.path.dirname(manifest_file) cloud_storage_manifest.set_index() - if cloud_storage_manifest_prefix: - sorted_media_without_manifest_prefix = [ - os.path.relpath(i, cloud_storage_manifest_prefix) for i in sorted_media - ] - sequence, raw_content = cloud_storage_manifest.get_subset(sorted_media_without_manifest_prefix) - def _add_prefix(properties): - file_name = properties['name'] - properties['name'] = os.path.join(cloud_storage_manifest_prefix, file_name) - return properties - content = list(map(_add_prefix, raw_content)) + cloud_storage_manifest_prefix = os.path.dirname(manifest_file) + + # update list with server files if task creation approach with pattern and manifest file is used + if is_data_in_cloud and data['filename_pattern']: + if 1 != len(data['server_files']): + l = len(data['server_files']) - 1 + raise ValidationError( + 'Using a filename_pattern is only supported with a manifest file, ' + f'but others {l} file{"s" if l > 1 else ""} {"were" if l > 1 else "was"} found' + 'Please remove extra files and keep only manifest file in server_files field.' + ) + + cloud_storage_manifest_data = list(cloud_storage_manifest.data) if not cloud_storage_manifest_prefix \ + else [os.path.join(cloud_storage_manifest_prefix, f) for f in cloud_storage_manifest.data] + if data['filename_pattern'] == '*': + server_files = cloud_storage_manifest_data else: - sequence, content = cloud_storage_manifest.get_subset(sorted_media) - sorted_content = (i[1] for i in sorted(zip(sequence, content))) - manifest.create(sorted_content) + server_files = fnmatch.filter(cloud_storage_manifest_data, data['filename_pattern']) + data['server_files'].extend(server_files) + + # count and validate uploaded files + media = _count_files(data) + media, task_mode = _validate_data(media, manifest_files) + + if data['server_files']: + if db_data.storage == models.StorageChoice.LOCAL: + _copy_data_from_source(data['server_files'], upload_dir, data.get('server_files_path')) + elif is_data_in_cloud: + sorted_media = sort(media['image'], data['sorting_method']) + + # download previews from cloud storage + data_size = len(sorted_media) + segment_step, *_ = _get_task_segment_data(db_task, data_size) + for preview_frame in range(0, data_size, segment_step): + preview = sorted_media[preview_frame] + cloud_storage_instance.download_file(preview, os.path.join(upload_dir, preview)) + + # Define task manifest content based on cloud storage manifest content and uploaded files + _create_task_manifest_based_on_cloud_storage_manifest( + sorted_media, cloud_storage_manifest_prefix, + cloud_storage_manifest, manifest) av_scan_paths(upload_dir) diff --git a/tests/python/rest_api/test_tasks.py b/tests/python/rest_api/test_tasks.py index 9df6d10c31f..08800aef9ca 100644 --- a/tests/python/rest_api/test_tasks.py +++ b/tests/python/rest_api/test_tasks.py @@ -4,8 +4,12 @@ # SPDX-License-Identifier: MIT import json +import os.path as osp +import subprocess from copy import deepcopy +from functools import partial from http import HTTPStatus +from tempfile import TemporaryDirectory from time import sleep import pytest @@ -13,6 +17,7 @@ from cvat_sdk.core.helpers import get_paginated_collection from deepdiff import DeepDiff +import shared.utils.s3 as s3 from shared.utils.config import get_method, make_api_client, patch_method from shared.utils.helpers import generate_image_files @@ -675,6 +680,120 @@ def test_create_task_with_cloud_storage_files( self._USERNAME, task_spec, data_spec, content_type="application/json", org=org ) + @pytest.mark.with_external_services + @pytest.mark.parametrize("cloud_storage_id", [1]) + @pytest.mark.parametrize( + "manifest, filename_pattern, sub_dir, task_size", + [ + ("manifest.jsonl", "*", True, 3), # public bucket + ("manifest.jsonl", "test/*", True, 3), + ("manifest.jsonl", "test/sub*1.jpeg", True, 1), + ("manifest.jsonl", "*image*.jpeg", True, 3), + ("manifest.jsonl", "wrong_pattern", True, 0), + ("abc_manifest.jsonl", "[a-c]*.jpeg", False, 2), + ("abc_manifest.jsonl", "[d]*.jpeg", False, 1), + ("abc_manifest.jsonl", "[e-z]*.jpeg", False, 0), + ], + ) + @pytest.mark.parametrize("org", [""]) + def test_create_task_with_file_pattern( + self, + cloud_storage_id, + manifest, + filename_pattern, + sub_dir, + task_size, + org, + cloud_storages, + request, + ): + # prepare dataset on the bucket + prefixes = ("test_image_",) * 3 if sub_dir else ("a_", "b_", "d_") + images = generate_image_files(3, prefixes=prefixes) + s3_client = s3.make_client() + + cloud_storage = cloud_storages[cloud_storage_id] + + for image in images: + s3_client.create_file( + data=image, + bucket=cloud_storage["resource"], + filename=f"{'test/sub/' if sub_dir else ''}{image.name}", + ) + request.addfinalizer( + partial( + s3_client.remove_file, + bucket=cloud_storage["resource"], + filename=f"{'test/sub/' if sub_dir else ''}{image.name}", + ) + ) + + with TemporaryDirectory() as tmp_dir: + for image in images: + with open(osp.join(tmp_dir, image.name), "wb") as f: + f.write(image.getvalue()) + + command = [ + "docker", + "run", + "--rm", + "-u", + "root:root", + "-v", + f"{tmp_dir}:/local", + "--entrypoint", + "python3", + "cvat/server", + "utils/dataset_manifest/create.py", + "--output-dir", + "/local", + "/local", + ] + subprocess.run(command, check=True) + with open(osp.join(tmp_dir, "manifest.jsonl"), mode="rb") as m_file: + s3_client.create_file( + data=m_file.read(), + bucket=cloud_storage["resource"], + filename=f"test/sub/{manifest}" if sub_dir else manifest, + ) + request.addfinalizer( + partial( + s3_client.remove_file, + bucket=cloud_storage["resource"], + filename=f"test/sub/{manifest}" if sub_dir else manifest, + ) + ) + + task_spec = { + "name": f"Task with files from cloud storage {cloud_storage_id}", + "labels": [ + { + "name": "car", + } + ], + } + + data_spec = { + "image_quality": 75, + "use_cache": True, + "cloud_storage_id": cloud_storage_id, + "server_files": [f"test/sub/{manifest}" if sub_dir else manifest], + "filename_pattern": filename_pattern, + } + + if task_size: + task_id = self._test_create_task( + self._USERNAME, task_spec, data_spec, content_type="application/json", org=org + ) + + with make_api_client(self._USERNAME) as api_client: + (task, response) = api_client.tasks_api.retrieve(task_id, org=org) + assert response.status == HTTPStatus.OK + assert task.size == task_size + else: + status = self._test_cannot_create_task(self._USERNAME, task_spec, data_spec) + assert "No media data found" in status.message + @pytest.mark.with_external_services @pytest.mark.parametrize( "cloud_storage_id, manifest, org", diff --git a/tests/python/shared/utils/helpers.py b/tests/python/shared/utils/helpers.py index a8a7120c78b..e7289ac3bee 100644 --- a/tests/python/shared/utils/helpers.py +++ b/tests/python/shared/utils/helpers.py @@ -18,10 +18,11 @@ def generate_image_file(filename="image.png", size=(50, 50), color=(0, 0, 0)): return f -def generate_image_files(count) -> List[BytesIO]: +def generate_image_files(count, prefixes=None) -> List[BytesIO]: images = [] for i in range(count): - image = generate_image_file(f"{i}.jpeg", color=(i, i, i)) + prefix = prefixes[i] if prefixes else "" + image = generate_image_file(f"{prefix}{i}.jpeg", color=(i, i, i)) images.append(image) return images