Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
Signed-off-by: Kim, Vinnam <vinnam.kim@intel.com>
  • Loading branch information
vinnamkim committed Oct 16, 2023
1 parent 96627c1 commit 7c0e5b6
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 466 deletions.
7 changes: 2 additions & 5 deletions docs/source/docs/data-formats/formats/arrow.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,10 @@ Extra options for exporting to Arrow format:
- `JPEG/95`: [JPEG](https://en.wikipedia.org/wiki/JPEG) with 95 quality
- `JPEG/75`: [JPEG](https://en.wikipedia.org/wiki/JPEG) with 75 quality
- `NONE`: skip saving image.
- `--max-chunk-size MAX_CHUNK_SIZE` allow to specify maximum chunk size (batch size) when saving into arrow format.
- `--max-shard-size MAX_SHARD_SIZE` allow to specify maximum number of dataset items when saving into arrow format.
(default: `1000`)
- `--num-shards NUM_SHARDS` allow to specify the number of shards to generate.
`--num-shards` and `--max-shard-size` are mutually exclusive.
(default: `1`)
- `--max-shard-size MAX_SHARD_SIZE` allow to specify maximum size of each shard. (e.g. 7KB = 7 \* 2^10, 3MB = 3 \* 2^20, and 2GB = 2 \* 2^30)
`--num-shards` and `--max-shard-size` are mutually exclusive.
`--num-shards` and `--max-shard-size` are mutually exclusive.
(default: `None`)
- `--num-workers NUM_WORKERS` allow to multi-processing for the export. If num_workers = 0, do not use multiprocessing (default: `0`).

Expand Down
6 changes: 3 additions & 3 deletions src/datumaro/components/dataset_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,13 @@ def media_type(_):

return _DatasetFilter()

def infos(self):
def infos(self) -> DatasetInfo:
return {}

def categories(self):
def categories(self) -> CategoriesInfo:
return {}

def get(self, id, subset=None):
def get(self, id, subset=None) -> Optional[DatasetItem]:
subset = subset or DEFAULT_SUBSET_NAME
for item in self:
if item.id == id and item.subset == subset:
Expand Down
85 changes: 43 additions & 42 deletions src/datumaro/plugins/data_formats/arrow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,36 @@
#
# SPDX-License-Identifier: MIT

from collections import defaultdict
from dataclasses import dataclass
import os.path as osp
import struct
from dataclasses import dataclass
from typing import Any, Dict, Iterator, List, Optional, Type

import pyarrow as pa
from datumaro.components.annotation import AnnotationType, Categories

from datumaro.components.dataset_base import DatasetBase, DatasetItem, IDataset, SubsetBase
from datumaro.components.errors import MediaTypeError
from datumaro.components.annotation import AnnotationType, Categories
from datumaro.components.dataset_base import (
CategoriesInfo,
DatasetBase,
DatasetInfo,
DatasetItem,
IDataset,
SubsetBase,
)
from datumaro.components.importer import ImportContext
from datumaro.components.media import Image, MediaElement, MediaType
from datumaro.components.merge import get_merger
from datumaro.components.merge.extractor_merger import check_identicalness
from datumaro.plugins.data_formats.arrow.format import DatumaroArrow
from datumaro.plugins.data_formats.datumaro.base import JsonReader
from datumaro.plugins.data_formats.datumaro_binary.mapper.common import DictMapper
from datumaro.util.definitions import DEFAULT_SUBSET_NAME
import weakref

from .mapper.dataset_item import DatasetItemMapper


class ArrowSubsetBase(SubsetBase):
"""
A base class for simple, single-subset extractors.
Should be used by default for user-defined extractors.
"""

def __init__(
self,
lookup: Dict[str, int],
table: pa.Table,
lookup: Dict[str, DatasetItem],
infos: Dict[str, Any],
categories: Dict[AnnotationType, Categories],
subset: str,
Expand All @@ -44,24 +40,22 @@ def __init__(
super().__init__(length=len(lookup), subset=subset, media_type=media_type, ctx=None)

self._lookup = lookup
self._table = table
self._infos = infos
self._categories = categories

def __iter__(self) -> Iterator[DatasetItem]:
for table_idx in self._lookup.values():
yield DatasetItemMapper.backward(self._table[table_idx], self._table)
for item in self._lookup.values():
yield item

def __len__(self):
def __len__(self) -> int:
return len(self._lookup)

def get(self, item_id: str, subset: Optional[str] = None):
def get(self, item_id: str, subset: Optional[str] = None) -> Optional[DatasetItem]:
if subset != self._subset:
return None

try:
table_idx = self._lookup[item_id]
return DatasetItemMapper.backward(table_idx, self._table)
return self._lookup[item_id]
except KeyError:
return None

Expand Down Expand Up @@ -94,7 +88,7 @@ def __init__(
self._infos = check_identicalness([metadata.infos for metadata in metadatas])
self._categories = check_identicalness([metadata.categories for metadata in metadatas])

self._init_cache(table)
self._init_cache(file_paths, subsets)

@staticmethod
def _load_schema_metadata(table: pa.Table) -> Metadata:
Expand All @@ -113,46 +107,53 @@ def _load_schema_metadata(table: pa.Table) -> Metadata:

return Metadata(infos=infos, categories=categories, media_type=media_type)

def infos(self):
def infos(self) -> DatasetInfo:
return self._infos

def categories(self):
def categories(self) -> CategoriesInfo:
return self._categories

def __iter__(self) -> Iterator[DatasetItem]:
for idx in range(len(self)):
yield DatasetItemMapper.backward(idx, self._table)

def _init_cache(self, table: pa.Table):
self._lookup = defaultdict(dict)

for idx, (item_id, item_subset) in enumerate(
zip(
table.column(DatumaroArrow.ID_FIELD),
table.column(DatumaroArrow.SUBSET_FIELD),
)
):
self._lookup[item_subset.as_py()][item_id.as_py()] = idx
for lookup in self._lookup.values():
for item in lookup.values():
yield item

def _init_cache(self, file_paths: List[str], subsets: List[str]):
self._lookup: Dict[str, Dict[str, DatasetItem]] = {subset: {} for subset in subsets}

total = len(self)
cnt = 0
pbar = self._ctx.progress_reporter
pbar.start(total=total, desc="Importing")

for table_path in file_paths:
with pa.OSFile(table_path, "r") as source:
with pa.ipc.open_file(source) as reader:
table = reader.read_all()
for idx in range(len(table)):
item = DatasetItemMapper.backward(idx, table, table_path)
self._lookup[item.subset][item.id] = item
pbar.report_status(cnt)
cnt += 1

self._subsets = {
subset: ArrowSubsetBase(
lookup=lookup,
table=table,
infos=self._infos,
categories=self._categories,
subset=self._subsets,
media_type=self._media_type,
)
for subset, lookup in self._lookup.items()
}
self._table = table

pbar.finish()

def get(self, item_id: str, subset: Optional[str] = None) -> Optional[DatasetItem]:
subset = subset or DEFAULT_SUBSET_NAME

try:
table_idx = self._lookup[subset][item_id]
return DatasetItemMapper.backward(self._table[table_idx], self._table)
return self._lookup[subset][item_id]
except KeyError:
return None

Expand Down
75 changes: 36 additions & 39 deletions src/datumaro/plugins/data_formats/arrow/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,18 @@
#
# SPDX-License-Identifier: MIT

from collections import defaultdict
import datetime
import os
import platform
import re
import struct
import tempfile
from copy import deepcopy
from functools import partial
from multiprocessing.pool import ApplyResult, AsyncResult, Pool
from multiprocessing.pool import Pool
from shutil import move, rmtree
from typing import Any, Callable, Dict, Iterator, List, Optional, Union
from typing import Any, Callable, Dict, Iterator, Optional, Union

import memory_profiler
import numpy as np
import pyarrow as pa
import pytz

from datumaro.components.crypter import NULL_CRYPTER
from datumaro.components.dataset_base import DEFAULT_SUBSET_NAME, DatasetItem, IDataset
from datumaro.components.dataset_base import DatasetItem, IDataset
from datumaro.components.errors import DatumaroError
from datumaro.components.exporter import ExportContext, ExportContextComponent, Exporter
from datumaro.plugins.data_formats.datumaro.exporter import _SubsetWriter as __SubsetWriter
from datumaro.plugins.data_formats.datumaro_binary.mapper.common import DictMapper
from datumaro.util.file_utils import to_bytes
from datumaro.components.exporter import ExportContext, Exporter
from datumaro.util.multi_procs_util import consumer_generator

from .format import DatumaroArrow
Expand Down Expand Up @@ -58,12 +45,12 @@ def build_cmdline_parser(cls, **kwargs):
)

parser.add_argument(
"--max-chunk-size",
"--max-shard-size",
type=int,
default=1000,
help="The maximum number of dataset item can be stored in each shard file. "
"'--max-chunk-size' and '--num-shards' are mutually exclusive, "
"Therefore, if '--max-chunk-size' is not None, the number of shard files will be determined by "
"'--max-shard-size' and '--num-shards' are mutually exclusive, "
"Therefore, if '--max-shard-size' is not None, the number of shard files will be determined by "
"(# of total dataset item) / (max chunk size). "
"(default: %(default)s)",
)
Expand All @@ -73,7 +60,7 @@ def build_cmdline_parser(cls, **kwargs):
type=int,
default=None,
help="The number of shards to export. "
"'--max-chunk-size' and '--num-shards' are mutually exclusive. "
"'--max-shard-size' and '--num-shards' are mutually exclusive. "
"Therefore, if '--num-shards' is not None, the number of dataset item in each shard file "
"will be determined by (# of total dataset item) / (num shards). "
"(default: %(default)s)",
Expand Down Expand Up @@ -108,6 +95,8 @@ def _apply_impl(self, *args, **kwargs):
def _apply(self, pool: Optional[Pool] = None):
os.makedirs(self._save_dir, exist_ok=True)

pbar = self._ctx.progress_reporter

if pool is not None:

def _producer_gen():
Expand All @@ -119,26 +108,34 @@ def _producer_gen():
yield future

with consumer_generator(producer_generator=_producer_gen()) as consumer_gen:
self._write_file(consumer_gen)

def _gen_with_pbar():
for item in pbar.iter(
consumer_gen, desc="Exporting", total=len(self._extractor)
):
yield item.get()

self._write_file(_gen_with_pbar())

else:

def create_consumer_gen():
for item in self._extractor:
for item in pbar.iter(self._extractor, desc="Exporting"):
yield self._item_to_dict_record(item, self._image_ext, self._source_path)

self._write_file(create_consumer_gen())

def _write_file(self, consumer_gen: Iterator[Dict[str, Any]]) -> None:
for file_idx, size in enumerate(self._chunk_sizes):
suffix = str(file_idx).zfill(self._max_digits)
fname = f"{self._prefix}-{suffix}.arrow"
fpath = os.path.join(self._save_dir, fname)

record_batch = pa.RecordBatch.from_pylist(
mapping=[next(consumer_gen) for _ in range(size)],
schema=self._schema,
)

suffix = str(file_idx).zfill(self._max_digits)
fpath = os.path.join(self._save_dir, f"{self._prefix}-{suffix}.arrow")

with pa.OSFile(fpath, "wb") as sink:
with pa.ipc.new_file(sink, self._schema) as writer:
writer.write(record_batch)
Expand Down Expand Up @@ -173,7 +170,7 @@ def __init__(
save_dataset_meta: bool = False,
ctx: Optional[ExportContext] = None,
num_workers: int = 0,
max_chunk_size: Optional[int] = 1000,
max_shard_size: Optional[int] = 1000,
num_shards: Optional[int] = None,
prefix: str = "datum",
**kwargs,
Expand All @@ -194,23 +191,23 @@ def __init__(
)
self._num_workers = num_workers

if num_shards is not None and max_chunk_size is not None:
if num_shards is not None and max_shard_size is not None:
raise DatumaroError(
"Both 'num_shards' or 'max_chunk_size' cannot be provided at the same time."
"Both 'num_shards' or 'max_shard_size' cannot be provided at the same time."
)
elif num_shards is not None and num_shards < 0:
raise DatumaroError(f"num_shards should be non-negative but num_shards={num_shards}.")
elif max_chunk_size is not None and max_chunk_size < 0:
elif max_shard_size is not None and max_shard_size < 0:
raise DatumaroError(
f"max_chunk_size should be non-negative but max_chunk_size={max_chunk_size}."
f"max_shard_size should be non-negative but max_shard_size={max_shard_size}."
)
elif num_shards is None and max_chunk_size is None:
elif num_shards is None and max_shard_size is None:
raise DatumaroError(
"Either one of 'num_shards' or 'max_chunk_size' should be provided."
"Either one of 'num_shards' or 'max_shard_size' should be provided."
)

self._num_shards = num_shards
self._max_chunk_size = max_chunk_size
self._max_shard_size = max_shard_size

if self._save_media:
self._image_ext = (
Expand All @@ -234,17 +231,17 @@ def __init__(
total_len = len(self._extractor)

if self._num_shards is not None:
max_chunk_size = int(total_len / self._num_shards) + 1
max_shard_size = int(total_len / self._num_shards) + 1

elif self._max_chunk_size is not None:
max_chunk_size = self._max_chunk_size
elif self._max_shard_size is not None:
max_shard_size = self._max_shard_size
else:
raise DatumaroError(
"Either one of 'num_shards' or 'max_chunk_size' should be provided."
"Either one of 'num_shards' or 'max_shard_size' should be provided."
)

self._chunk_sizes = np.diff(
np.array([size for size in range(0, total_len, max_chunk_size)] + [total_len])
np.array([size for size in range(0, total_len, max_shard_size)] + [total_len])
)
assert (
sum(self._chunk_sizes) == total_len
Expand Down
Loading

0 comments on commit 7c0e5b6

Please sign in to comment.