Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk preparation optimization #7081

Merged
merged 8 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from cvat.apps.engine.mime_types import mimetypes
from cvat.apps.engine.models import (DataChoice, DimensionType, Job, Image,
StorageChoice, CloudStorage)
from cvat.apps.engine.utils import md5_hash
from cvat.apps.engine.utils import md5_hash, preload_images
from utils.dataset_manifest import ImageManifestManager

slogger = ServerLogManager(__name__)
Expand Down Expand Up @@ -117,7 +117,7 @@ def _get_frame_provider_class():

@staticmethod
@contextmanager
def _get_images(db_data, chunk_number):
def _get_images(db_data, chunk_number, dimension):
images = []
tmp_dir = None
upload_dir = {
Expand Down Expand Up @@ -168,6 +168,8 @@ def _get_images(db_data, chunk_number):
images.append((fs_filename, fs_filename, None))

cloud_storage_instance.bulk_download_to_dir(files=files_to_download, upload_dir=tmp_dir)
if dimension == DimensionType.DIM_2D:
images = preload_images(images)
nmanovic marked this conversation as resolved.
Show resolved Hide resolved

for checksum, (_, fs_filename, _) in zip(checksums, images):
if checksum and not md5_hash(fs_filename) == checksum:
Expand All @@ -176,6 +178,8 @@ def _get_images(db_data, chunk_number):
for item in reader:
source_path = os.path.join(upload_dir, f"{item['name']}{item['extension']}")
images.append((source_path, source_path, None))
if dimension == DimensionType.DIM_2D:
images = preload_images(images)

yield images
finally:
Expand All @@ -199,7 +203,7 @@ def _prepare_task_chunk(self, db_data, quality, chunk_number):
writer = writer_classes[quality](image_quality, **kwargs)

buff = BytesIO()
with self._get_images(db_data, chunk_number) as images:
with self._get_images(db_data, chunk_number, self._dimension) as images:
writer.save_as_chunk(images, buff)
buff.seek(0)

Expand Down
71 changes: 39 additions & 32 deletions cvat/apps/engine/media_extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from enum import IntEnum
from abc import ABC, abstractmethod
from contextlib import closing
from typing import Iterable

import av
import numpy as np
Expand Down Expand Up @@ -587,12 +588,17 @@ def __init__(self, quality, dimension=DimensionType.DIM_2D):
self._dimension = dimension

@staticmethod
def _compress_image(image_path, quality):
if isinstance(image_path, av.VideoFrame):
image = image_path.to_image()
else:
with Image.open(image_path) as source_image:
image = ImageOps.exif_transpose(source_image)
def _compress_image(source_image: av.VideoFrame | io.IOBase | Image.Image, quality: int) -> tuple[int, int, io.BytesIO]:
image = None
if isinstance(source_image, av.VideoFrame):
image = source_image.to_image()
elif isinstance(source_image, io.IOBase):
with Image.open(source_image) as _img:
image = ImageOps.exif_transpose(_img)
elif isinstance(source_image, Image.Image):
image = source_image

assert image is not None

# Ensure image data fits into 8bit per pixel before RGB conversion as PIL clips values on conversion
if image.mode == "I":
Expand All @@ -619,7 +625,7 @@ def _compress_image(image_path, quality):
image = ImageOps.equalize(image) # The Images need equalization. High resolution with 16-bit but only small range that actually contains information

converted_image = image.convert('RGB')
image.close()

try:
buf = io.BytesIO()
converted_image.save(buf, format='JPEG', quality=quality, optimize=True)
Expand All @@ -637,7 +643,7 @@ class ZipChunkWriter(IChunkWriter):
IMAGE_EXT = 'jpeg'
POINT_CLOUD_EXT = 'pcd'

def _write_pcd_file(self, image):
def _write_pcd_file(self, image: str|io.BytesIO) -> tuple[io.BytesIO, str, int, int]:
image_buf = open(image, "rb") if isinstance(image, str) else image
try:
properties = ValidateDimension.get_pcd_properties(image_buf)
Expand All @@ -648,33 +654,32 @@ def _write_pcd_file(self, image):
if isinstance(image, str):
image_buf.close()

def save_as_chunk(self, images, chunk_path):
def save_as_chunk(self, images: Iterable[tuple[Image.Image|io.IOBase|str, str, str]], chunk_path: str):
with zipfile.ZipFile(chunk_path, 'x') as zip_chunk:
for idx, (image, path, _) in enumerate(images):
ext = os.path.splitext(path)[1].replace('.', '')
output = io.BytesIO()
if self._dimension == DimensionType.DIM_2D:
with Image.open(image) as pil_image:
if has_exif_rotation(pil_image):
rot_image = ImageOps.exif_transpose(pil_image)
try:
if rot_image.format == 'TIFF':
# https://pillow.readthedocs.io/en/stable/handbook/image-file-formats.html
# use loseless lzw compression for tiff images
rot_image.save(output, format='TIFF', compression='tiff_lzw')
else:
rot_image.save(
output,
format=rot_image.format if rot_image.format else self.IMAGE_EXT,
quality=100,
subsampling=0
)
finally:
rot_image.close()
else:
output = image
if has_exif_rotation(image):
rot_image = ImageOps.exif_transpose(image)
try:
if rot_image.format == 'TIFF':
# https://pillow.readthedocs.io/en/stable/handbook/image-file-formats.html
# use loseless lzw compression for tiff images
rot_image.save(output, format='TIFF', compression='tiff_lzw')
else:
rot_image.save(
output,
format=rot_image.format if rot_image.format else self.IMAGE_EXT,
quality=100,
subsampling=0
)
finally:
rot_image.close()
else:
output = path
else:
output, ext = self._write_pcd_file(image)[0:2]
output, ext = self._write_pcd_file(path)[0:2]
arcname = '{:06d}.{}'.format(idx, ext)

if isinstance(output, io.BytesIO):
Expand All @@ -687,11 +692,13 @@ def save_as_chunk(self, images, chunk_path):

class ZipCompressedChunkWriter(ZipChunkWriter):
def save_as_chunk(
self, images, chunk_path, *, compress_frames: bool = True, zip_compress_level: int = 0
self,
images: Iterable[tuple[Image.Image|io.IOBase|str, str, str]],
chunk_path: str, *, compress_frames: bool = True, zip_compress_level: int = 0
):
image_sizes = []
with zipfile.ZipFile(chunk_path, 'x', compresslevel=zip_compress_level) as zip_chunk:
for idx, (image, _, _) in enumerate(images):
for idx, (image, path, _) in enumerate(images):
if self._dimension == DimensionType.DIM_2D:
if compress_frames:
w, h, image_buf = self._compress_image(image, self._image_quality)
Expand All @@ -702,7 +709,7 @@ def save_as_chunk(
w, h = img.size
extension = self.IMAGE_EXT
else:
image_buf, extension, w, h = self._write_pcd_file(image)
image_buf, extension, w, h = self._write_pcd_file(path)
image_sizes.append((w, h))
arcname = '{:06d}.{}'.format(idx, extension)
zip_chunk.writestr(arcname, image_buf.getvalue())
Expand Down
81 changes: 59 additions & 22 deletions cvat/apps/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import fnmatch
import os
import sys
from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union
from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union, Iterable
from rest_framework.serializers import ValidationError
import rq
import re
Expand All @@ -17,6 +17,8 @@
from urllib import request as urlrequest
import django_rq
import pytz
import concurrent.futures
import queue

from django.conf import settings
from django.db import transaction
Expand All @@ -27,7 +29,7 @@
from cvat.apps.engine.log import ServerLogManager
from cvat.apps.engine.media_extractors import (MEDIA_TYPES, ImageListReader, Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter,
ValidateDimension, ZipChunkWriter, ZipCompressedChunkWriter, get_mime, sort)
from cvat.apps.engine.utils import av_scan_paths,get_rq_job_meta, define_dependent_job, get_rq_lock_by_user
from cvat.apps.engine.utils import av_scan_paths,get_rq_job_meta, define_dependent_job, get_rq_lock_by_user, preload_images
from cvat.utils.http import make_requests_session, PROXIES_FOR_UNTRUSTED_URLS
from utils.dataset_manifest import ImageManifestManager, VideoManifestManager, is_manifest
from utils.dataset_manifest.core import VideoManifestValidator, is_dataset_manifest
Expand Down Expand Up @@ -1025,37 +1027,72 @@ def _update_status(msg):
frame=frame, width=w, height=h)
for (path, frame), (w, h) in zip(chunk_paths, img_sizes)
])

if db_data.storage_method == models.StorageMethodChoice.FILE_SYSTEM or not settings.USE_CACHE:
counter = itertools.count()
generator = itertools.groupby(extractor, lambda x: next(counter) // db_data.chunk_size)
for chunk_idx, chunk_data in generator:
chunk_data = list(chunk_data)
original_chunk_path = db_data.get_original_chunk_path(chunk_idx)
original_chunk_writer.save_as_chunk(chunk_data, original_chunk_path)
generator = itertools.groupby(extractor, lambda _: next(counter) // db_data.chunk_size)
generator = ((idx, list(chunk_data)) for idx, chunk_data in generator)

def save_chunks(
executor: concurrent.futures.ThreadPoolExecutor,
chunk_idx: int,
chunk_data: Iterable[tuple[str, str, str]]) -> list[tuple[str, int, tuple[int, int]]]:
nonlocal db_data, db_task, extractor, original_chunk_writer, compressed_chunk_writer
if db_task.dimension == models.DimensionType.DIM_2D and (
isinstance(extractor, MEDIA_TYPES['image']['extractor']) or
isinstance(extractor, MEDIA_TYPES['zip']['extractor']) or
isinstance(extractor, MEDIA_TYPES['pdf']['extractor']) or
isinstance(extractor, MEDIA_TYPES['archive']['extractor'])):
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
chunk_data = preload_images(chunk_data)

fs_original = executor.submit(
original_chunk_writer.save_as_chunk,
images=chunk_data,
chunk_path=db_data.get_original_chunk_path(chunk_idx)
)
fs_compressed = executor.submit(
compressed_chunk_writer.save_as_chunk,
images=chunk_data,
chunk_path=db_data.get_compressed_chunk_path(chunk_idx),
)
fs_original.result()
image_sizes = fs_compressed.result()

# (path, frame, size)
return list((i[0][1], i[0][2], i[1]) for i in zip(chunk_data, image_sizes))

compressed_chunk_path = db_data.get_compressed_chunk_path(chunk_idx)
img_sizes = compressed_chunk_writer.save_as_chunk(chunk_data, compressed_chunk_path)
def process_results(img_meta: list[tuple[str, int, tuple[int, int]]]):
nonlocal db_images, db_data, video_path, video_size

if db_task.mode == 'annotation':
db_images.extend([
db_images.extend(
models.Image(
data=db_data,
path=os.path.relpath(data[1], upload_dir),
frame=data[2],
width=size[0],
height=size[1])

for data, size in zip(chunk_data, img_sizes)
])
path=os.path.relpath(frame_path, upload_dir),
frame=frame_number,
width=frame_size[0],
height=frame_size[1])
for frame_path, frame_number, frame_size in img_meta)
else:
video_size = img_sizes[0]
video_path = chunk_data[0][1]
video_size = img_meta[0][2]
video_path = img_meta[0][0]

db_data.size += len(chunk_data)
progress = extractor.get_progress(chunk_data[-1][2])
progress = extractor.get_progress(img_meta[-1][1])
update_progress(progress)

futures = queue.Queue(maxsize=settings.CVAT_CONCURRENT_CHUNK_PROCESSING)
with concurrent.futures.ThreadPoolExecutor(max_workers=2*settings.CVAT_CONCURRENT_CHUNK_PROCESSING) as executor:
for chunk_idx, chunk_data in generator:
db_data.size += len(chunk_data)
if not futures.full():
futures.put(executor.submit(save_chunks, executor, chunk_idx, chunk_data))
continue

process_results(futures.get().result())
futures.put(executor.submit(save_chunks, executor, chunk_idx, chunk_data))
azhavoro marked this conversation as resolved.
Show resolved Hide resolved

while not futures.empty():
process_results(futures.get().result())

if db_task.mode == 'annotation':
models.Image.objects.bulk_create(db_images)
created_images = models.Image.objects.filter(data_id=db_data.id)
Expand Down
10 changes: 9 additions & 1 deletion cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
import traceback
from contextlib import suppress, nullcontext
from typing import Any, Dict, Optional, Callable, Union
from typing import Any, Dict, Optional, Callable, Union, Iterable
import subprocess
import os
import urllib.parse
Expand Down Expand Up @@ -375,3 +375,11 @@ def sendfile(
attachment_filename = make_attachment_file_name(attachment_filename)

return _sendfile(request, filename, attachment, attachment_filename, mimetype, encoding)

def preload_image(image: tuple[str, str, str])-> tuple[Image.Image, str, str]:
pil_img = Image.open(image[0])
pil_img.load()
return pil_img, image[1], image[2]

def preload_images(images: Iterable[tuple[str, str, str]]) -> list[tuple[Image.Image, str, str]]:
return list(map(preload_image, images))
3 changes: 3 additions & 0 deletions cvat/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,3 +720,6 @@ class CVAT_QUEUES(Enum):
EMAIL_BACKEND = None

ONE_RUNNING_JOB_IN_QUEUE_PER_USER = strtobool(os.getenv('ONE_RUNNING_JOB_IN_QUEUE_PER_USER', 'false'))

# How many chunks can be prepared simultaneously during task creation in case the cache is not used
CVAT_CONCURRENT_CHUNK_PROCESSING = int(os.getenv('CVAT_CONCURRENT_CHUNK_PROCESSING', 1))
Loading