diff --git a/changelog.d/20231102_105602_andrey_optimization_creation_of_tasks.md b/changelog.d/20231102_105602_andrey_optimization_creation_of_tasks.md new file mode 100644 index 000000000000..44fd5f67cef9 --- /dev/null +++ b/changelog.d/20231102_105602_andrey_optimization_creation_of_tasks.md @@ -0,0 +1,4 @@ +### Changed + +- Improved performance of chunk preparation when creating tasks + () diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 6f88ed51290c..a1139b4bf16e 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -33,7 +33,7 @@ from cvat.apps.engine.mime_types import mimetypes from cvat.apps.engine.models import (DataChoice, DimensionType, Job, Image, StorageChoice, CloudStorage) -from cvat.apps.engine.utils import md5_hash +from cvat.apps.engine.utils import md5_hash, preload_images from utils.dataset_manifest import ImageManifestManager slogger = ServerLogManager(__name__) @@ -117,7 +117,7 @@ def _get_frame_provider_class(): @staticmethod @contextmanager - def _get_images(db_data, chunk_number): + def _get_images(db_data, chunk_number, dimension): images = [] tmp_dir = None upload_dir = { @@ -168,6 +168,7 @@ def _get_images(db_data, chunk_number): images.append((fs_filename, fs_filename, None)) cloud_storage_instance.bulk_download_to_dir(files=files_to_download, upload_dir=tmp_dir) + images = preload_images(images) for checksum, (_, fs_filename, _) in zip(checksums, images): if checksum and not md5_hash(fs_filename) == checksum: @@ -176,6 +177,8 @@ def _get_images(db_data, chunk_number): for item in reader: source_path = os.path.join(upload_dir, f"{item['name']}{item['extension']}") images.append((source_path, source_path, None)) + if dimension == DimensionType.DIM_2D: + images = preload_images(images) yield images finally: @@ -199,7 +202,7 @@ def _prepare_task_chunk(self, db_data, quality, chunk_number): writer = writer_classes[quality](image_quality, **kwargs) buff = BytesIO() - with self._get_images(db_data, chunk_number) as images: + with self._get_images(db_data, chunk_number, self._dimension) as images: writer.save_as_chunk(images, buff) buff.seek(0) diff --git a/cvat/apps/engine/media_extractors.py b/cvat/apps/engine/media_extractors.py index 5f64a93f3024..3de0c0829d4d 100644 --- a/cvat/apps/engine/media_extractors.py +++ b/cvat/apps/engine/media_extractors.py @@ -12,6 +12,7 @@ from enum import IntEnum from abc import ABC, abstractmethod from contextlib import closing +from typing import Iterable import av import numpy as np @@ -587,12 +588,17 @@ def __init__(self, quality, dimension=DimensionType.DIM_2D): self._dimension = dimension @staticmethod - def _compress_image(image_path, quality): - if isinstance(image_path, av.VideoFrame): - image = image_path.to_image() - else: - with Image.open(image_path) as source_image: - image = ImageOps.exif_transpose(source_image) + def _compress_image(source_image: av.VideoFrame | io.IOBase | Image.Image, quality: int) -> tuple[int, int, io.BytesIO]: + image = None + if isinstance(source_image, av.VideoFrame): + image = source_image.to_image() + elif isinstance(source_image, io.IOBase): + with Image.open(source_image) as _img: + image = ImageOps.exif_transpose(_img) + elif isinstance(source_image, Image.Image): + image = source_image + + assert image is not None # Ensure image data fits into 8bit per pixel before RGB conversion as PIL clips values on conversion if image.mode == "I": @@ -619,7 +625,7 @@ def _compress_image(image_path, quality): image = ImageOps.equalize(image) # The Images need equalization. High resolution with 16-bit but only small range that actually contains information converted_image = image.convert('RGB') - image.close() + try: buf = io.BytesIO() converted_image.save(buf, format='JPEG', quality=quality, optimize=True) @@ -637,7 +643,7 @@ class ZipChunkWriter(IChunkWriter): IMAGE_EXT = 'jpeg' POINT_CLOUD_EXT = 'pcd' - def _write_pcd_file(self, image): + def _write_pcd_file(self, image: str|io.BytesIO) -> tuple[io.BytesIO, str, int, int]: image_buf = open(image, "rb") if isinstance(image, str) else image try: properties = ValidateDimension.get_pcd_properties(image_buf) @@ -648,33 +654,32 @@ def _write_pcd_file(self, image): if isinstance(image, str): image_buf.close() - def save_as_chunk(self, images, chunk_path): + def save_as_chunk(self, images: Iterable[tuple[Image.Image|io.IOBase|str, str, str]], chunk_path: str): with zipfile.ZipFile(chunk_path, 'x') as zip_chunk: for idx, (image, path, _) in enumerate(images): ext = os.path.splitext(path)[1].replace('.', '') output = io.BytesIO() if self._dimension == DimensionType.DIM_2D: - with Image.open(image) as pil_image: - if has_exif_rotation(pil_image): - rot_image = ImageOps.exif_transpose(pil_image) - try: - if rot_image.format == 'TIFF': - # https://pillow.readthedocs.io/en/stable/handbook/image-file-formats.html - # use loseless lzw compression for tiff images - rot_image.save(output, format='TIFF', compression='tiff_lzw') - else: - rot_image.save( - output, - format=rot_image.format if rot_image.format else self.IMAGE_EXT, - quality=100, - subsampling=0 - ) - finally: - rot_image.close() - else: - output = image + if has_exif_rotation(image): + rot_image = ImageOps.exif_transpose(image) + try: + if rot_image.format == 'TIFF': + # https://pillow.readthedocs.io/en/stable/handbook/image-file-formats.html + # use loseless lzw compression for tiff images + rot_image.save(output, format='TIFF', compression='tiff_lzw') + else: + rot_image.save( + output, + format=rot_image.format if rot_image.format else self.IMAGE_EXT, + quality=100, + subsampling=0 + ) + finally: + rot_image.close() + else: + output = path else: - output, ext = self._write_pcd_file(image)[0:2] + output, ext = self._write_pcd_file(path)[0:2] arcname = '{:06d}.{}'.format(idx, ext) if isinstance(output, io.BytesIO): @@ -687,11 +692,13 @@ def save_as_chunk(self, images, chunk_path): class ZipCompressedChunkWriter(ZipChunkWriter): def save_as_chunk( - self, images, chunk_path, *, compress_frames: bool = True, zip_compress_level: int = 0 + self, + images: Iterable[tuple[Image.Image|io.IOBase|str, str, str]], + chunk_path: str, *, compress_frames: bool = True, zip_compress_level: int = 0 ): image_sizes = [] with zipfile.ZipFile(chunk_path, 'x', compresslevel=zip_compress_level) as zip_chunk: - for idx, (image, _, _) in enumerate(images): + for idx, (image, path, _) in enumerate(images): if self._dimension == DimensionType.DIM_2D: if compress_frames: w, h, image_buf = self._compress_image(image, self._image_quality) @@ -702,7 +709,7 @@ def save_as_chunk( w, h = img.size extension = self.IMAGE_EXT else: - image_buf, extension, w, h = self._write_pcd_file(image) + image_buf, extension, w, h = self._write_pcd_file(path) image_sizes.append((w, h)) arcname = '{:06d}.{}'.format(idx, extension) zip_chunk.writestr(arcname, image_buf.getvalue()) diff --git a/cvat/apps/engine/task.py b/cvat/apps/engine/task.py index f9956e7a4e77..a1194c62c2ac 100644 --- a/cvat/apps/engine/task.py +++ b/cvat/apps/engine/task.py @@ -7,7 +7,7 @@ import fnmatch import os import sys -from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union +from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union, Iterable from rest_framework.serializers import ValidationError import rq import re @@ -17,6 +17,8 @@ from urllib import request as urlrequest import django_rq import pytz +import concurrent.futures +import queue from django.conf import settings from django.db import transaction @@ -27,7 +29,7 @@ from cvat.apps.engine.log import ServerLogManager from cvat.apps.engine.media_extractors import (MEDIA_TYPES, ImageListReader, Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter, ValidateDimension, ZipChunkWriter, ZipCompressedChunkWriter, get_mime, sort) -from cvat.apps.engine.utils import av_scan_paths,get_rq_job_meta, define_dependent_job, get_rq_lock_by_user +from cvat.apps.engine.utils import av_scan_paths,get_rq_job_meta, define_dependent_job, get_rq_lock_by_user, preload_images from cvat.utils.http import make_requests_session, PROXIES_FOR_UNTRUSTED_URLS from utils.dataset_manifest import ImageManifestManager, VideoManifestManager, is_manifest from utils.dataset_manifest.core import VideoManifestValidator, is_dataset_manifest @@ -1025,37 +1027,71 @@ def _update_status(msg): frame=frame, width=w, height=h) for (path, frame), (w, h) in zip(chunk_paths, img_sizes) ]) - if db_data.storage_method == models.StorageMethodChoice.FILE_SYSTEM or not settings.USE_CACHE: counter = itertools.count() - generator = itertools.groupby(extractor, lambda x: next(counter) // db_data.chunk_size) - for chunk_idx, chunk_data in generator: - chunk_data = list(chunk_data) - original_chunk_path = db_data.get_original_chunk_path(chunk_idx) - original_chunk_writer.save_as_chunk(chunk_data, original_chunk_path) + generator = itertools.groupby(extractor, lambda _: next(counter) // db_data.chunk_size) + generator = ((idx, list(chunk_data)) for idx, chunk_data in generator) + + def save_chunks( + executor: concurrent.futures.ThreadPoolExecutor, + chunk_idx: int, + chunk_data: Iterable[tuple[str, str, str]]) -> list[tuple[str, int, tuple[int, int]]]: + nonlocal db_data, db_task, extractor, original_chunk_writer, compressed_chunk_writer + if (db_task.dimension == models.DimensionType.DIM_2D and + isinstance(extractor, ( + MEDIA_TYPES['image']['extractor'], + MEDIA_TYPES['zip']['extractor'], + MEDIA_TYPES['pdf']['extractor'], + MEDIA_TYPES['archive']['extractor'], + ))): + chunk_data = preload_images(chunk_data) + + fs_original = executor.submit( + original_chunk_writer.save_as_chunk, + images=chunk_data, + chunk_path=db_data.get_original_chunk_path(chunk_idx) + ) + fs_compressed = executor.submit( + compressed_chunk_writer.save_as_chunk, + images=chunk_data, + chunk_path=db_data.get_compressed_chunk_path(chunk_idx), + ) + fs_original.result() + image_sizes = fs_compressed.result() + + # (path, frame, size) + return list((i[0][1], i[0][2], i[1]) for i in zip(chunk_data, image_sizes)) - compressed_chunk_path = db_data.get_compressed_chunk_path(chunk_idx) - img_sizes = compressed_chunk_writer.save_as_chunk(chunk_data, compressed_chunk_path) + def process_results(img_meta: list[tuple[str, int, tuple[int, int]]]): + nonlocal db_images, db_data, video_path, video_size if db_task.mode == 'annotation': - db_images.extend([ + db_images.extend( models.Image( data=db_data, - path=os.path.relpath(data[1], upload_dir), - frame=data[2], - width=size[0], - height=size[1]) - - for data, size in zip(chunk_data, img_sizes) - ]) + path=os.path.relpath(frame_path, upload_dir), + frame=frame_number, + width=frame_size[0], + height=frame_size[1]) + for frame_path, frame_number, frame_size in img_meta) else: - video_size = img_sizes[0] - video_path = chunk_data[0][1] + video_size = img_meta[0][2] + video_path = img_meta[0][0] - db_data.size += len(chunk_data) - progress = extractor.get_progress(chunk_data[-1][2]) + progress = extractor.get_progress(img_meta[-1][1]) update_progress(progress) + futures = queue.Queue(maxsize=settings.CVAT_CONCURRENT_CHUNK_PROCESSING) + with concurrent.futures.ThreadPoolExecutor(max_workers=2*settings.CVAT_CONCURRENT_CHUNK_PROCESSING) as executor: + for chunk_idx, chunk_data in generator: + db_data.size += len(chunk_data) + if futures.full(): + process_results(futures.get().result()) + futures.put(executor.submit(save_chunks, executor, chunk_idx, chunk_data)) + + while not futures.empty(): + process_results(futures.get().result()) + if db_task.mode == 'annotation': models.Image.objects.bulk_create(db_images) created_images = models.Image.objects.filter(data_id=db_data.id) diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 0e17b24dd788..0a1b29801907 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -10,7 +10,7 @@ import sys import traceback from contextlib import suppress, nullcontext -from typing import Any, Dict, Optional, Callable, Union +from typing import Any, Dict, Optional, Callable, Union, Iterable import subprocess import os import urllib.parse @@ -375,3 +375,11 @@ def sendfile( attachment_filename = make_attachment_file_name(attachment_filename) return _sendfile(request, filename, attachment, attachment_filename, mimetype, encoding) + +def preload_image(image: tuple[str, str, str])-> tuple[Image.Image, str, str]: + pil_img = Image.open(image[0]) + pil_img.load() + return pil_img, image[1], image[2] + +def preload_images(images: Iterable[tuple[str, str, str]]) -> list[tuple[Image.Image, str, str]]: + return list(map(preload_image, images)) diff --git a/cvat/settings/base.py b/cvat/settings/base.py index 24eefc23fca9..89f5f1e2c1b8 100644 --- a/cvat/settings/base.py +++ b/cvat/settings/base.py @@ -720,3 +720,6 @@ class CVAT_QUEUES(Enum): EMAIL_BACKEND = None ONE_RUNNING_JOB_IN_QUEUE_PER_USER = strtobool(os.getenv('ONE_RUNNING_JOB_IN_QUEUE_PER_USER', 'false')) + +# How many chunks can be prepared simultaneously during task creation in case the cache is not used +CVAT_CONCURRENT_CHUNK_PROCESSING = int(os.getenv('CVAT_CONCURRENT_CHUNK_PROCESSING', 1))