Skip to content

Commit

Permalink
Chunk preparation optimization (#7081)
Browse files Browse the repository at this point in the history
This PR speeds up the preparation of chunks by: 
1. loading images once instead of twice in each writer,
2. as well as by allowing simultaneous preparation of more than 1 chunk
using multithreading.
This allows to reduce the time for preparation of chunks for 4895 images
from 0:04:36 to 0:01:20 in case of preparation of 3 chunks in parallel
and 0:02:46 in case of 1 chunk in my environment.

Co-authored-by: Maria Khrustaleva <maya17grd@gmail.com>
  • Loading branch information
azhavoro and Marishka17 authored Nov 2, 2023
1 parent 1f8d5d3 commit 0535d45
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Changed

- Improved performance of chunk preparation when creating tasks
(<https://github.com/opencv/cvat/pull/7081>)
9 changes: 6 additions & 3 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from cvat.apps.engine.mime_types import mimetypes
from cvat.apps.engine.models import (DataChoice, DimensionType, Job, Image,
StorageChoice, CloudStorage)
from cvat.apps.engine.utils import md5_hash
from cvat.apps.engine.utils import md5_hash, preload_images
from utils.dataset_manifest import ImageManifestManager

slogger = ServerLogManager(__name__)
Expand Down Expand Up @@ -117,7 +117,7 @@ def _get_frame_provider_class():

@staticmethod
@contextmanager
def _get_images(db_data, chunk_number):
def _get_images(db_data, chunk_number, dimension):
images = []
tmp_dir = None
upload_dir = {
Expand Down Expand Up @@ -168,6 +168,7 @@ def _get_images(db_data, chunk_number):
images.append((fs_filename, fs_filename, None))

cloud_storage_instance.bulk_download_to_dir(files=files_to_download, upload_dir=tmp_dir)
images = preload_images(images)

for checksum, (_, fs_filename, _) in zip(checksums, images):
if checksum and not md5_hash(fs_filename) == checksum:
Expand All @@ -176,6 +177,8 @@ def _get_images(db_data, chunk_number):
for item in reader:
source_path = os.path.join(upload_dir, f"{item['name']}{item['extension']}")
images.append((source_path, source_path, None))
if dimension == DimensionType.DIM_2D:
images = preload_images(images)

yield images
finally:
Expand All @@ -199,7 +202,7 @@ def _prepare_task_chunk(self, db_data, quality, chunk_number):
writer = writer_classes[quality](image_quality, **kwargs)

buff = BytesIO()
with self._get_images(db_data, chunk_number) as images:
with self._get_images(db_data, chunk_number, self._dimension) as images:
writer.save_as_chunk(images, buff)
buff.seek(0)

Expand Down
71 changes: 39 additions & 32 deletions cvat/apps/engine/media_extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from enum import IntEnum
from abc import ABC, abstractmethod
from contextlib import closing
from typing import Iterable

import av
import numpy as np
Expand Down Expand Up @@ -587,12 +588,17 @@ def __init__(self, quality, dimension=DimensionType.DIM_2D):
self._dimension = dimension

@staticmethod
def _compress_image(image_path, quality):
if isinstance(image_path, av.VideoFrame):
image = image_path.to_image()
else:
with Image.open(image_path) as source_image:
image = ImageOps.exif_transpose(source_image)
def _compress_image(source_image: av.VideoFrame | io.IOBase | Image.Image, quality: int) -> tuple[int, int, io.BytesIO]:
image = None
if isinstance(source_image, av.VideoFrame):
image = source_image.to_image()
elif isinstance(source_image, io.IOBase):
with Image.open(source_image) as _img:
image = ImageOps.exif_transpose(_img)
elif isinstance(source_image, Image.Image):
image = source_image

assert image is not None

# Ensure image data fits into 8bit per pixel before RGB conversion as PIL clips values on conversion
if image.mode == "I":
Expand All @@ -619,7 +625,7 @@ def _compress_image(image_path, quality):
image = ImageOps.equalize(image) # The Images need equalization. High resolution with 16-bit but only small range that actually contains information

converted_image = image.convert('RGB')
image.close()

try:
buf = io.BytesIO()
converted_image.save(buf, format='JPEG', quality=quality, optimize=True)
Expand All @@ -637,7 +643,7 @@ class ZipChunkWriter(IChunkWriter):
IMAGE_EXT = 'jpeg'
POINT_CLOUD_EXT = 'pcd'

def _write_pcd_file(self, image):
def _write_pcd_file(self, image: str|io.BytesIO) -> tuple[io.BytesIO, str, int, int]:
image_buf = open(image, "rb") if isinstance(image, str) else image
try:
properties = ValidateDimension.get_pcd_properties(image_buf)
Expand All @@ -648,33 +654,32 @@ def _write_pcd_file(self, image):
if isinstance(image, str):
image_buf.close()

def save_as_chunk(self, images, chunk_path):
def save_as_chunk(self, images: Iterable[tuple[Image.Image|io.IOBase|str, str, str]], chunk_path: str):
with zipfile.ZipFile(chunk_path, 'x') as zip_chunk:
for idx, (image, path, _) in enumerate(images):
ext = os.path.splitext(path)[1].replace('.', '')
output = io.BytesIO()
if self._dimension == DimensionType.DIM_2D:
with Image.open(image) as pil_image:
if has_exif_rotation(pil_image):
rot_image = ImageOps.exif_transpose(pil_image)
try:
if rot_image.format == 'TIFF':
# https://pillow.readthedocs.io/en/stable/handbook/image-file-formats.html
# use loseless lzw compression for tiff images
rot_image.save(output, format='TIFF', compression='tiff_lzw')
else:
rot_image.save(
output,
format=rot_image.format if rot_image.format else self.IMAGE_EXT,
quality=100,
subsampling=0
)
finally:
rot_image.close()
else:
output = image
if has_exif_rotation(image):
rot_image = ImageOps.exif_transpose(image)
try:
if rot_image.format == 'TIFF':
# https://pillow.readthedocs.io/en/stable/handbook/image-file-formats.html
# use loseless lzw compression for tiff images
rot_image.save(output, format='TIFF', compression='tiff_lzw')
else:
rot_image.save(
output,
format=rot_image.format if rot_image.format else self.IMAGE_EXT,
quality=100,
subsampling=0
)
finally:
rot_image.close()
else:
output = path
else:
output, ext = self._write_pcd_file(image)[0:2]
output, ext = self._write_pcd_file(path)[0:2]
arcname = '{:06d}.{}'.format(idx, ext)

if isinstance(output, io.BytesIO):
Expand All @@ -687,11 +692,13 @@ def save_as_chunk(self, images, chunk_path):

class ZipCompressedChunkWriter(ZipChunkWriter):
def save_as_chunk(
self, images, chunk_path, *, compress_frames: bool = True, zip_compress_level: int = 0
self,
images: Iterable[tuple[Image.Image|io.IOBase|str, str, str]],
chunk_path: str, *, compress_frames: bool = True, zip_compress_level: int = 0
):
image_sizes = []
with zipfile.ZipFile(chunk_path, 'x', compresslevel=zip_compress_level) as zip_chunk:
for idx, (image, _, _) in enumerate(images):
for idx, (image, path, _) in enumerate(images):
if self._dimension == DimensionType.DIM_2D:
if compress_frames:
w, h, image_buf = self._compress_image(image, self._image_quality)
Expand All @@ -702,7 +709,7 @@ def save_as_chunk(
w, h = img.size
extension = self.IMAGE_EXT
else:
image_buf, extension, w, h = self._write_pcd_file(image)
image_buf, extension, w, h = self._write_pcd_file(path)
image_sizes.append((w, h))
arcname = '{:06d}.{}'.format(idx, extension)
zip_chunk.writestr(arcname, image_buf.getvalue())
Expand Down
80 changes: 58 additions & 22 deletions cvat/apps/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import fnmatch
import os
import sys
from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union
from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union, Iterable
from rest_framework.serializers import ValidationError
import rq
import re
Expand All @@ -17,6 +17,8 @@
from urllib import request as urlrequest
import django_rq
import pytz
import concurrent.futures
import queue

from django.conf import settings
from django.db import transaction
Expand All @@ -27,7 +29,7 @@
from cvat.apps.engine.log import ServerLogManager
from cvat.apps.engine.media_extractors import (MEDIA_TYPES, ImageListReader, Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter,
ValidateDimension, ZipChunkWriter, ZipCompressedChunkWriter, get_mime, sort)
from cvat.apps.engine.utils import av_scan_paths,get_rq_job_meta, define_dependent_job, get_rq_lock_by_user
from cvat.apps.engine.utils import av_scan_paths,get_rq_job_meta, define_dependent_job, get_rq_lock_by_user, preload_images
from cvat.utils.http import make_requests_session, PROXIES_FOR_UNTRUSTED_URLS
from utils.dataset_manifest import ImageManifestManager, VideoManifestManager, is_manifest
from utils.dataset_manifest.core import VideoManifestValidator, is_dataset_manifest
Expand Down Expand Up @@ -1025,37 +1027,71 @@ def _update_status(msg):
frame=frame, width=w, height=h)
for (path, frame), (w, h) in zip(chunk_paths, img_sizes)
])

if db_data.storage_method == models.StorageMethodChoice.FILE_SYSTEM or not settings.USE_CACHE:
counter = itertools.count()
generator = itertools.groupby(extractor, lambda x: next(counter) // db_data.chunk_size)
for chunk_idx, chunk_data in generator:
chunk_data = list(chunk_data)
original_chunk_path = db_data.get_original_chunk_path(chunk_idx)
original_chunk_writer.save_as_chunk(chunk_data, original_chunk_path)
generator = itertools.groupby(extractor, lambda _: next(counter) // db_data.chunk_size)
generator = ((idx, list(chunk_data)) for idx, chunk_data in generator)

def save_chunks(
executor: concurrent.futures.ThreadPoolExecutor,
chunk_idx: int,
chunk_data: Iterable[tuple[str, str, str]]) -> list[tuple[str, int, tuple[int, int]]]:
nonlocal db_data, db_task, extractor, original_chunk_writer, compressed_chunk_writer
if (db_task.dimension == models.DimensionType.DIM_2D and
isinstance(extractor, (
MEDIA_TYPES['image']['extractor'],
MEDIA_TYPES['zip']['extractor'],
MEDIA_TYPES['pdf']['extractor'],
MEDIA_TYPES['archive']['extractor'],
))):
chunk_data = preload_images(chunk_data)

fs_original = executor.submit(
original_chunk_writer.save_as_chunk,
images=chunk_data,
chunk_path=db_data.get_original_chunk_path(chunk_idx)
)
fs_compressed = executor.submit(
compressed_chunk_writer.save_as_chunk,
images=chunk_data,
chunk_path=db_data.get_compressed_chunk_path(chunk_idx),
)
fs_original.result()
image_sizes = fs_compressed.result()

# (path, frame, size)
return list((i[0][1], i[0][2], i[1]) for i in zip(chunk_data, image_sizes))

compressed_chunk_path = db_data.get_compressed_chunk_path(chunk_idx)
img_sizes = compressed_chunk_writer.save_as_chunk(chunk_data, compressed_chunk_path)
def process_results(img_meta: list[tuple[str, int, tuple[int, int]]]):
nonlocal db_images, db_data, video_path, video_size

if db_task.mode == 'annotation':
db_images.extend([
db_images.extend(
models.Image(
data=db_data,
path=os.path.relpath(data[1], upload_dir),
frame=data[2],
width=size[0],
height=size[1])

for data, size in zip(chunk_data, img_sizes)
])
path=os.path.relpath(frame_path, upload_dir),
frame=frame_number,
width=frame_size[0],
height=frame_size[1])
for frame_path, frame_number, frame_size in img_meta)
else:
video_size = img_sizes[0]
video_path = chunk_data[0][1]
video_size = img_meta[0][2]
video_path = img_meta[0][0]

db_data.size += len(chunk_data)
progress = extractor.get_progress(chunk_data[-1][2])
progress = extractor.get_progress(img_meta[-1][1])
update_progress(progress)

futures = queue.Queue(maxsize=settings.CVAT_CONCURRENT_CHUNK_PROCESSING)
with concurrent.futures.ThreadPoolExecutor(max_workers=2*settings.CVAT_CONCURRENT_CHUNK_PROCESSING) as executor:
for chunk_idx, chunk_data in generator:
db_data.size += len(chunk_data)
if futures.full():
process_results(futures.get().result())
futures.put(executor.submit(save_chunks, executor, chunk_idx, chunk_data))

while not futures.empty():
process_results(futures.get().result())

if db_task.mode == 'annotation':
models.Image.objects.bulk_create(db_images)
created_images = models.Image.objects.filter(data_id=db_data.id)
Expand Down
10 changes: 9 additions & 1 deletion cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
import traceback
from contextlib import suppress, nullcontext
from typing import Any, Dict, Optional, Callable, Union
from typing import Any, Dict, Optional, Callable, Union, Iterable
import subprocess
import os
import urllib.parse
Expand Down Expand Up @@ -375,3 +375,11 @@ def sendfile(
attachment_filename = make_attachment_file_name(attachment_filename)

return _sendfile(request, filename, attachment, attachment_filename, mimetype, encoding)

def preload_image(image: tuple[str, str, str])-> tuple[Image.Image, str, str]:
pil_img = Image.open(image[0])
pil_img.load()
return pil_img, image[1], image[2]

def preload_images(images: Iterable[tuple[str, str, str]]) -> list[tuple[Image.Image, str, str]]:
return list(map(preload_image, images))
3 changes: 3 additions & 0 deletions cvat/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,3 +720,6 @@ class CVAT_QUEUES(Enum):
EMAIL_BACKEND = None

ONE_RUNNING_JOB_IN_QUEUE_PER_USER = strtobool(os.getenv('ONE_RUNNING_JOB_IN_QUEUE_PER_USER', 'false'))

# How many chunks can be prepared simultaneously during task creation in case the cache is not used
CVAT_CONCURRENT_CHUNK_PROCESSING = int(os.getenv('CVAT_CONCURRENT_CHUNK_PROCESSING', 1))

0 comments on commit 0535d45

Please sign in to comment.