Skip to content
This repository has been archived by the owner on Nov 14, 2023. It is now read-only.

Commit

Permalink
Simplify upload data for task (cvat-ai#5498)
Browse files Browse the repository at this point in the history
It's possible to specify only the manifest file and filename pattern for
creating task with cloud storage data.
The special characters supported now for the pattern are `*`, `?`,
`[seq]`, `[!seq]`.
Please see
[here](https://github.com/opencv/cvat/blob/8898a8b2647514dd6f3f6ce83745b1ca8ef72bce/tests/python/rest_api/test_tasks.py#L686)
for some examples of how to use this functionality.

Co-authored-by: Maxim Zhiltsov <zhiltsov.max35@gmail.com>
  • Loading branch information
2 people authored and mikhail-treskin committed Jul 1, 2023
1 parent 9d9d304 commit cbc59ef
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 49 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## \[2.4.0] - Unreleased
### Added
- TDB
- Filename pattern to simplify uploading cloud storage data for a task (<https://github.com/opencv/cvat/pull/5498>)

### Changed
- TDB
Expand Down
4 changes: 3 additions & 1 deletion cvat/apps/engine/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,13 @@ class DataSerializer(WriteOnceMixin, serializers.ModelSerializer):
use_cache = serializers.BooleanField(default=False)
copy_data = serializers.BooleanField(default=False)
cloud_storage_id = serializers.IntegerField(write_only=True, allow_null=True, required=False)
filename_pattern = serializers.CharField(allow_null=True, required=False)

class Meta:
model = models.Data
fields = ('chunk_size', 'size', 'image_quality', 'start_frame', 'stop_frame', 'frame_filter',
'compressed_chunk_type', 'original_chunk_type', 'client_files', 'server_files', 'remote_files', 'use_zip_chunks',
'cloud_storage_id', 'use_cache', 'copy_data', 'storage_method', 'storage', 'sorting_method')
'cloud_storage_id', 'use_cache', 'copy_data', 'storage_method', 'storage', 'sorting_method', 'filename_pattern')

# pylint: disable=no-self-use
def validate_frame_filter(self, value):
Expand All @@ -396,6 +397,7 @@ def validate(self, attrs):
if 'start_frame' in attrs and 'stop_frame' in attrs \
and attrs['start_frame'] > attrs['stop_frame']:
raise serializers.ValidationError('Stop frame must be more or equal start frame')

return attrs

def create(self, validated_data):
Expand Down
130 changes: 85 additions & 45 deletions cvat/apps/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# SPDX-License-Identifier: MIT

import itertools
import fnmatch
import os
import sys
from rest_framework.serializers import ValidationError
Expand Down Expand Up @@ -127,7 +128,7 @@ def _save_task_to_db(db_task, extractor):
db_task.data.save()
db_task.save()

def _count_files(data, manifest_files=None):
def _count_files(data):
share_root = settings.SHARE_ROOT
server_files = []

Expand Down Expand Up @@ -158,7 +159,7 @@ def count_files(file_mapping, counter):
if mime in counter:
counter[mime].append(rel_path)
elif rel_path.endswith('.jsonl'):
manifest_files.append(rel_path)
continue
else:
slogger.glob.warn("Skip '{}' file (its mime type doesn't "
"correspond to supported MIME file type)".format(full_path))
Expand All @@ -177,6 +178,12 @@ def count_files(file_mapping, counter):

return counter

def _find_manifest_files(data):
manifest_files = []
for files in ['client_files', 'server_files', 'remote_files']:
manifest_files.extend(list(filter(lambda x: x.endswith('.jsonl'), data[files])))
return manifest_files

def _validate_data(counter, manifest_files=None):
unique_entries = 0
multiple_entries = 0
Expand Down Expand Up @@ -207,10 +214,10 @@ def _validate_data(counter, manifest_files=None):

return counter, task_modes[0]

def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage):
def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage, data_storage_method):
if manifests:
if len(manifests) != 1:
raise Exception('Only one manifest file can be attached with data')
raise ValidationError('Only one manifest file can be attached to data')
manifest_file = manifests[0]
full_manifest_path = os.path.join(root_dir, manifests[0])
if is_in_cloud:
Expand All @@ -221,8 +228,10 @@ def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage):
< cloud_storage_instance.get_file_last_modified(manifest_file):
cloud_storage_instance.download_file(manifest_file, full_manifest_path)
if is_manifest(full_manifest_path):
if not (settings.USE_CACHE or data_storage_method != models.StorageMethodChoice.CACHE):
raise ValidationError("Manifest file can be uploaded only if 'Use cache' option is also selected")
return manifest_file
raise Exception('Invalid manifest was uploaded')
raise ValidationError('Invalid manifest was uploaded')
return None

def _validate_url(url):
Expand Down Expand Up @@ -291,6 +300,26 @@ def _download_data(urls, upload_dir):
def _get_manifest_frame_indexer(start_frame=0, frame_step=1):
return lambda frame_id: start_frame + frame_id * frame_step

def _create_task_manifest_based_on_cloud_storage_manifest(
sorted_media,
cloud_storage_manifest_prefix,
cloud_storage_manifest,
manifest
):
if cloud_storage_manifest_prefix:
sorted_media_without_manifest_prefix = [
os.path.relpath(i, cloud_storage_manifest_prefix) for i in sorted_media
]
sequence, raw_content = cloud_storage_manifest.get_subset(sorted_media_without_manifest_prefix)
def _add_prefix(properties):
file_name = properties['name']
properties['name'] = os.path.join(cloud_storage_manifest_prefix, file_name)
return properties
content = list(map(_add_prefix, raw_content))
else:
sequence, content = cloud_storage_manifest.get_subset(sorted_media)
sorted_content = (i[1] for i in sorted(zip(sequence, content)))
manifest.create(sorted_content)

@transaction.atomic
def _create_thread(db_task, data, isBackupRestore=False, isDatasetImport=False):
Expand All @@ -300,69 +329,80 @@ def _create_thread(db_task, data, isBackupRestore=False, isDatasetImport=False):
slogger.glob.info("create task #{}".format(db_task.id))

db_data = db_task.data
upload_dir = db_data.get_upload_dirname()
upload_dir = db_data.get_upload_dirname() if db_data.storage != models.StorageChoice.SHARE else settings.SHARE_ROOT
is_data_in_cloud = db_data.storage == models.StorageChoice.CLOUD_STORAGE

if data['remote_files'] and not isDatasetImport:
data['remote_files'] = _download_data(data['remote_files'], upload_dir)

manifest_files = []
media = _count_files(data, manifest_files)
media, task_mode = _validate_data(media, manifest_files)

if data['server_files']:
if db_data.storage == models.StorageChoice.LOCAL:
_copy_data_from_source(data['server_files'], upload_dir, data.get('server_files_path'))
elif db_data.storage == models.StorageChoice.SHARE:
upload_dir = settings.SHARE_ROOT

# find and validate manifest file
manifest_files = _find_manifest_files(data)
manifest_root = None
if db_data.storage in {models.StorageChoice.LOCAL, models.StorageChoice.SHARE}:

# we should also handle this case because files from the share source have not been downloaded yet
if data['copy_data']:
manifest_root = settings.SHARE_ROOT
elif db_data.storage in {models.StorageChoice.LOCAL, models.StorageChoice.SHARE}:
manifest_root = upload_dir
elif is_data_in_cloud:
manifest_root = db_data.cloud_storage.get_storage_dirname()

manifest_file = _validate_manifest(
manifest_files, manifest_root,
is_data_in_cloud, db_data.cloud_storage if is_data_in_cloud else None
is_data_in_cloud, db_data.cloud_storage if is_data_in_cloud else None,
db_data.storage_method,
)
if manifest_file and (not settings.USE_CACHE or db_data.storage_method != models.StorageMethodChoice.CACHE):
raise Exception("File with meta information can be uploaded if 'Use cache' option is also selected")

if data['server_files'] and is_data_in_cloud:
if is_data_in_cloud:
cloud_storage_instance = db_storage_to_storage_instance(db_data.cloud_storage)
sorted_media = sort(media['image'], data['sorting_method'])

data_size = len(sorted_media)
segment_step, *_ = _get_task_segment_data(db_task, data_size)
for start_frame in range(0, data_size, segment_step):
first_sorted_media_image = sorted_media[start_frame]
cloud_storage_instance.download_file(first_sorted_media_image, os.path.join(upload_dir, first_sorted_media_image))

# prepare task manifest file from cloud storage manifest file
# NOTE we should create manifest before defining chunk_size
# FIXME in the future when will be implemented archive support
manifest = ImageManifestManager(db_data.get_manifest_path())
cloud_storage_manifest = ImageManifestManager(
os.path.join(db_data.cloud_storage.get_storage_dirname(), manifest_file),
db_data.cloud_storage.get_storage_dirname()
)
cloud_storage_manifest_prefix = os.path.dirname(manifest_file)
cloud_storage_manifest.set_index()
if cloud_storage_manifest_prefix:
sorted_media_without_manifest_prefix = [
os.path.relpath(i, cloud_storage_manifest_prefix) for i in sorted_media
]
sequence, raw_content = cloud_storage_manifest.get_subset(sorted_media_without_manifest_prefix)
def _add_prefix(properties):
file_name = properties['name']
properties['name'] = os.path.join(cloud_storage_manifest_prefix, file_name)
return properties
content = list(map(_add_prefix, raw_content))
cloud_storage_manifest_prefix = os.path.dirname(manifest_file)

# update list with server files if task creation approach with pattern and manifest file is used
if is_data_in_cloud and data['filename_pattern']:
if 1 != len(data['server_files']):
l = len(data['server_files']) - 1
raise ValidationError(
'Using a filename_pattern is only supported with a manifest file, '
f'but others {l} file{"s" if l > 1 else ""} {"were" if l > 1 else "was"} found'
'Please remove extra files and keep only manifest file in server_files field.'
)

cloud_storage_manifest_data = list(cloud_storage_manifest.data) if not cloud_storage_manifest_prefix \
else [os.path.join(cloud_storage_manifest_prefix, f) for f in cloud_storage_manifest.data]
if data['filename_pattern'] == '*':
server_files = cloud_storage_manifest_data
else:
sequence, content = cloud_storage_manifest.get_subset(sorted_media)
sorted_content = (i[1] for i in sorted(zip(sequence, content)))
manifest.create(sorted_content)
server_files = fnmatch.filter(cloud_storage_manifest_data, data['filename_pattern'])
data['server_files'].extend(server_files)

# count and validate uploaded files
media = _count_files(data)
media, task_mode = _validate_data(media, manifest_files)

if data['server_files']:
if db_data.storage == models.StorageChoice.LOCAL:
_copy_data_from_source(data['server_files'], upload_dir, data.get('server_files_path'))
elif is_data_in_cloud:
sorted_media = sort(media['image'], data['sorting_method'])

# download previews from cloud storage
data_size = len(sorted_media)
segment_step, *_ = _get_task_segment_data(db_task, data_size)
for preview_frame in range(0, data_size, segment_step):
preview = sorted_media[preview_frame]
cloud_storage_instance.download_file(preview, os.path.join(upload_dir, preview))

# Define task manifest content based on cloud storage manifest content and uploaded files
_create_task_manifest_based_on_cloud_storage_manifest(
sorted_media, cloud_storage_manifest_prefix,
cloud_storage_manifest, manifest)

av_scan_paths(upload_dir)

Expand Down
119 changes: 119 additions & 0 deletions tests/python/rest_api/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@
# SPDX-License-Identifier: MIT

import json
import os.path as osp
import subprocess
from copy import deepcopy
from functools import partial
from http import HTTPStatus
from tempfile import TemporaryDirectory
from time import sleep

import pytest
from cvat_sdk.api_client import apis, models
from cvat_sdk.core.helpers import get_paginated_collection
from deepdiff import DeepDiff

import shared.utils.s3 as s3
from shared.utils.config import get_method, make_api_client, patch_method
from shared.utils.helpers import generate_image_files

Expand Down Expand Up @@ -675,6 +680,120 @@ def test_create_task_with_cloud_storage_files(
self._USERNAME, task_spec, data_spec, content_type="application/json", org=org
)

@pytest.mark.with_external_services
@pytest.mark.parametrize("cloud_storage_id", [1])
@pytest.mark.parametrize(
"manifest, filename_pattern, sub_dir, task_size",
[
("manifest.jsonl", "*", True, 3), # public bucket
("manifest.jsonl", "test/*", True, 3),
("manifest.jsonl", "test/sub*1.jpeg", True, 1),
("manifest.jsonl", "*image*.jpeg", True, 3),
("manifest.jsonl", "wrong_pattern", True, 0),
("abc_manifest.jsonl", "[a-c]*.jpeg", False, 2),
("abc_manifest.jsonl", "[d]*.jpeg", False, 1),
("abc_manifest.jsonl", "[e-z]*.jpeg", False, 0),
],
)
@pytest.mark.parametrize("org", [""])
def test_create_task_with_file_pattern(
self,
cloud_storage_id,
manifest,
filename_pattern,
sub_dir,
task_size,
org,
cloud_storages,
request,
):
# prepare dataset on the bucket
prefixes = ("test_image_",) * 3 if sub_dir else ("a_", "b_", "d_")
images = generate_image_files(3, prefixes=prefixes)
s3_client = s3.make_client()

cloud_storage = cloud_storages[cloud_storage_id]

for image in images:
s3_client.create_file(
data=image,
bucket=cloud_storage["resource"],
filename=f"{'test/sub/' if sub_dir else ''}{image.name}",
)
request.addfinalizer(
partial(
s3_client.remove_file,
bucket=cloud_storage["resource"],
filename=f"{'test/sub/' if sub_dir else ''}{image.name}",
)
)

with TemporaryDirectory() as tmp_dir:
for image in images:
with open(osp.join(tmp_dir, image.name), "wb") as f:
f.write(image.getvalue())

command = [
"docker",
"run",
"--rm",
"-u",
"root:root",
"-v",
f"{tmp_dir}:/local",
"--entrypoint",
"python3",
"cvat/server",
"utils/dataset_manifest/create.py",
"--output-dir",
"/local",
"/local",
]
subprocess.run(command, check=True)
with open(osp.join(tmp_dir, "manifest.jsonl"), mode="rb") as m_file:
s3_client.create_file(
data=m_file.read(),
bucket=cloud_storage["resource"],
filename=f"test/sub/{manifest}" if sub_dir else manifest,
)
request.addfinalizer(
partial(
s3_client.remove_file,
bucket=cloud_storage["resource"],
filename=f"test/sub/{manifest}" if sub_dir else manifest,
)
)

task_spec = {
"name": f"Task with files from cloud storage {cloud_storage_id}",
"labels": [
{
"name": "car",
}
],
}

data_spec = {
"image_quality": 75,
"use_cache": True,
"cloud_storage_id": cloud_storage_id,
"server_files": [f"test/sub/{manifest}" if sub_dir else manifest],
"filename_pattern": filename_pattern,
}

if task_size:
task_id = self._test_create_task(
self._USERNAME, task_spec, data_spec, content_type="application/json", org=org
)

with make_api_client(self._USERNAME) as api_client:
(task, response) = api_client.tasks_api.retrieve(task_id, org=org)
assert response.status == HTTPStatus.OK
assert task.size == task_size
else:
status = self._test_cannot_create_task(self._USERNAME, task_spec, data_spec)
assert "No media data found" in status.message

@pytest.mark.with_external_services
@pytest.mark.parametrize(
"cloud_storage_id, manifest, org",
Expand Down
5 changes: 3 additions & 2 deletions tests/python/shared/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ def generate_image_file(filename="image.png", size=(50, 50), color=(0, 0, 0)):
return f


def generate_image_files(count) -> List[BytesIO]:
def generate_image_files(count, prefixes=None) -> List[BytesIO]:
images = []
for i in range(count):
image = generate_image_file(f"{i}.jpeg", color=(i, i, i))
prefix = prefixes[i] if prefixes else ""
image = generate_image_file(f"{prefix}{i}.jpeg", color=(i, i, i))
images.append(image)

return images

0 comments on commit cbc59ef

Please sign in to comment.