Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory unbounded Arrow data format export/import #1169

Conversation

vinnamkim
Copy link
Contributor

@vinnamkim vinnamkim commented Oct 16, 2023

Summary

  • Ticket no. 122601
  • Version up Arrow data format export/import from 1.0 to 2.0 to make them memory bounded
Before After
export image image
import image image

Used the following script for the above experiment.

1. Synthetic data preparation (10000 items with a 224x224 image and a label are exported to Datumaro data format)
import numpy as np
from datumaro.components.media import Image
from datumaro.components.project import Dataset
import os
from datumaro.components.dataset_base import DatasetItem
from datumaro.components.annotation import Label

from datumaro.util.image import encode_image

from tempfile import TemporaryDirectory
from datumaro.components.progress_reporting import TQDMProgressReporter


def fxt_large(test_dir, n=5000) -> Dataset:
    items = []
    for i in range(n):
        media = None
        if i % 3 == 0:
            media = Image.from_numpy(data=np.random.randint(0, 255, (224, 224, 3)))
        elif i % 3 == 1:
            media = Image.from_bytes(
                data=encode_image(np.random.randint(0, 255, (224, 224, 3)), ".png")
            )
        elif i % 3 == 2:
            Image.from_numpy(data=np.random.randint(0, 255, (224, 224, 3))).save(
                os.path.join(test_dir, f"test{i}.jpg")
            )
            media = Image.from_file(path=os.path.join(test_dir, f"test{i}.jpg"))

        items.append(
            DatasetItem(
                id=i,
                subset="test",
                media=media,
                annotations=[Label(np.random.randint(0, 3))],
            )
        )

    source_dataset = Dataset.from_iterable(
        items,
        categories=["label"],
        media_type=Image,
    )

    return source_dataset


if __name__ == "__main__":
    source_dir = "source"
    os.makedirs(source_dir, exist_ok=True)
    with TemporaryDirectory() as test_dir:
        source = fxt_large(test_dir, n=10000)
        reporter = TQDMProgressReporter()
        source.export(
            source_dir,
            format="datumaro",
            save_media=True,
            progress_reporter=reporter,
        )
2. Export 10000 items to Arrow data format
import shutil
import os
from datumaro.components.progress_reporting import TQDMProgressReporter

from datumaro.components.dataset import StreamDataset

if __name__ == "__main__":
    source_dir = "source"

    source = StreamDataset.import_from(source_dir, format="datumaro")

    export_dir = "export"
    if os.path.exists(export_dir):
        shutil.rmtree(export_dir)

    reporter = TQDMProgressReporter()
    source.export(
        export_dir,
        format="arrow",
        save_media=True,
        max_shard_size=1000,
        progress_reporter=reporter,
    )
3. Import 10000 items in the Arrow data format
import pyarrow as pa
from random import shuffle
from datumaro.components.progress_reporting import TQDMProgressReporter
from time import time
from datumaro.components.dataset import Dataset
import memory_profiler
import shutil

if __name__ == "__main__":
    source_dir = "source"
    dst_dir = "source.backup"
    shutil.move(source_dir, dst_dir)

    export_dir = "export"
    reporter = TQDMProgressReporter()

    start = time()
    dataset = Dataset.import_from(export_dir, format="arrow", progress_reporter=reporter)
    keys = [(item.id, item.subset) for item in dataset]

    shuffle(keys)

    for item_id, subset in keys:
        item = dataset.get(item_id, subset)
        img_data = item.media.data

    dt = time() - start
    print(f"dt={dt:.2f}")
    print(memory_profiler.memory_usage()[0])
    print(pa.total_allocated_bytes())

    shutil.move(dst_dir, source_dir)

How to test

It can be covered by the existing tests.

Checklist

  • I have added unit tests to cover my changes.​
  • I have added integration tests to cover my changes.​
  • I have added the description of my changes into CHANGELOG.​
  • I have updated the documentation accordingly

License

  • I submit my code changes under the same MIT License that covers the project.
    Feel free to contact the maintainers if that's a concern.
  • I have updated the license header for each file (see an example below).
# Copyright (C) 2023 Intel Corporation
#
# SPDX-License-Identifier: MIT

Signed-off-by: Kim, Vinnam <vinnam.kim@intel.com>
Signed-off-by: Kim, Vinnam <vinnam.kim@intel.com>
Signed-off-by: Kim, Vinnam <vinnam.kim@intel.com>
Signed-off-by: Kim, Vinnam <vinnam.kim@intel.com>
@vinnamkim vinnamkim force-pushed the bugfix/reimpl-arrow-data-format branch from 7c0e5b6 to 138cfcc Compare October 16, 2023 09:41
@vinnamkim vinnamkim changed the title Fix Arrow data format unbounded export/import Fix memory unbounded Arrow data format export/import Oct 16, 2023
Signed-off-by: Kim, Vinnam <vinnam.kim@intel.com>
@vinnamkim vinnamkim added this to the 1.5.1 milestone Oct 16, 2023
@vinnamkim vinnamkim added BUG Something isn't working data formats PR is related to dataset formats labels Oct 16, 2023
@vinnamkim vinnamkim marked this pull request as ready for review October 16, 2023 09:42
@vinnamkim vinnamkim requested review from a team as code owners October 16, 2023 09:42
@vinnamkim vinnamkim requested review from wonjuleee and removed request for a team October 16, 2023 09:42
Signed-off-by: Kim, Vinnam <vinnam.kim@intel.com>
…impl-arrow-data-format

Signed-off-by: Kim, Vinnam <vinnam.kim@intel.com>
@codecov
Copy link

codecov bot commented Oct 16, 2023

Codecov Report

Attention: 45 lines in your changes are missing coverage. Please review.

Comparison is base (8b5fef0) 79.95% compared to head (5c89d37) 79.97%.

Additional details and impacted files
@@                Coverage Diff                 @@
##           releases/1.5.0    #1169      +/-   ##
==================================================
+ Coverage           79.95%   79.97%   +0.01%     
==================================================
  Files                 267      265       -2     
  Lines               29981    29706     -275     
  Branches             5898     5833      -65     
==================================================
- Hits                23972    23756     -216     
+ Misses               4650     4617      -33     
+ Partials             1359     1333      -26     
Flag Coverage Δ
macos-11_Python-3.8 79.09% <83.92%> (+<0.01%) ⬆️
ubuntu-20.04_Python-3.8 79.97% <83.92%> (+0.01%) ⬆️
windows-2022_Python-3.8 79.94% <83.92%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
src/datumaro/components/dataset_base.py 96.77% <100.00%> (ø)
src/datumaro/components/format_detection.py 93.68% <ø> (ø)
.../plugins/data_formats/arrow/mapper/dataset_item.py 100.00% <100.00%> (+3.70%) ⬆️
...datumaro/plugins/data_formats/datumaro/exporter.py 95.13% <100.00%> (+0.19%) ⬆️
src/datumaro/plugins/data_formats/arrow/format.py 82.35% <90.00%> (+9.01%) ⬆️
...atumaro/plugins/data_formats/arrow/mapper/media.py 84.82% <93.75%> (+11.02%) ⬆️
...rc/datumaro/plugins/data_formats/arrow/importer.py 89.47% <88.00%> (-5.13%) ⬇️
src/datumaro/plugins/data_formats/arrow/base.py 83.67% <82.41%> (-11.79%) ⬇️
...rc/datumaro/plugins/data_formats/arrow/exporter.py 64.51% <67.16%> (-8.95%) ⬇️

... and 5 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@vinnamkim vinnamkim merged commit 1991cc6 into openvinotoolkit:releases/1.5.0 Oct 19, 2023
6 checks passed
@vinnamkim vinnamkim deleted the bugfix/reimpl-arrow-data-format branch October 19, 2023 04:34
yunchu pushed a commit to yunchu/datumaro that referenced this pull request Oct 23, 2023
…#1169)

- Ticket no. 122601
- Version up Arrow data format export/import from 1.0 to 2.0 to make
them memory bounded

|   | Before | After |
| :-: |  :-: |  :-: | 
| export |
![image](https://github.com/openvinotoolkit/datumaro/assets/26541465/d5641aa7-5c2d-4f3d-899d-01f81cc0a7d1)
|
![image](https://github.com/openvinotoolkit/datumaro/assets/26541465/b0b246a5-9f7a-449a-82d5-2c9893f6bbba)
|
| import |
![image](https://github.com/openvinotoolkit/datumaro/assets/26541465/2c395306-5e8f-4813-a60e-afcbd954a66e)
|
![image](https://github.com/openvinotoolkit/datumaro/assets/26541465/f38e1e73-e304-4586-a0c4-ad6891bbe37f)
|

Used the following script for the above experiment.
<details>
<summary>1. Synthetic data preparation (10000 items with a 224x224 image
and a label are exported to Datumaro data format)</summary>

```python
import numpy as np
from datumaro.components.media import Image
from datumaro.components.project import Dataset
import os
from datumaro.components.dataset_base import DatasetItem
from datumaro.components.annotation import Label

from datumaro.util.image import encode_image

from tempfile import TemporaryDirectory
from datumaro.components.progress_reporting import TQDMProgressReporter


def fxt_large(test_dir, n=5000) -> Dataset:
    items = []
    for i in range(n):
        media = None
        if i % 3 == 0:
            media = Image.from_numpy(data=np.random.randint(0, 255, (224, 224, 3)))
        elif i % 3 == 1:
            media = Image.from_bytes(
                data=encode_image(np.random.randint(0, 255, (224, 224, 3)), ".png")
            )
        elif i % 3 == 2:
            Image.from_numpy(data=np.random.randint(0, 255, (224, 224, 3))).save(
                os.path.join(test_dir, f"test{i}.jpg")
            )
            media = Image.from_file(path=os.path.join(test_dir, f"test{i}.jpg"))

        items.append(
            DatasetItem(
                id=i,
                subset="test",
                media=media,
                annotations=[Label(np.random.randint(0, 3))],
            )
        )

    source_dataset = Dataset.from_iterable(
        items,
        categories=["label"],
        media_type=Image,
    )

    return source_dataset


if __name__ == "__main__":
    source_dir = "source"
    os.makedirs(source_dir, exist_ok=True)
    with TemporaryDirectory() as test_dir:
        source = fxt_large(test_dir, n=10000)
        reporter = TQDMProgressReporter()
        source.export(
            source_dir,
            format="datumaro",
            save_media=True,
            progress_reporter=reporter,
        )
```

</details>

<details>
  <summary>2. Export 10000 items to Arrow data format</summary>

```python
import shutil
import os
from datumaro.components.progress_reporting import TQDMProgressReporter

from datumaro.components.dataset import StreamDataset

if __name__ == "__main__":
    source_dir = "source"

    source = StreamDataset.import_from(source_dir, format="datumaro")

    export_dir = "export"
    if os.path.exists(export_dir):
        shutil.rmtree(export_dir)

    reporter = TQDMProgressReporter()
    source.export(
        export_dir,
        format="arrow",
        save_media=True,
        max_shard_size=1000,
        progress_reporter=reporter,
    )
```

</details>

<details>
  <summary>3. Import 10000 items in the Arrow data format </summary>

```python
import pyarrow as pa
from random import shuffle
from datumaro.components.progress_reporting import TQDMProgressReporter
from time import time
from datumaro.components.dataset import Dataset
import memory_profiler
import shutil

if __name__ == "__main__":
    source_dir = "source"
    dst_dir = "source.backup"
    shutil.move(source_dir, dst_dir)

    export_dir = "export"
    reporter = TQDMProgressReporter()

    start = time()
    dataset = Dataset.import_from(export_dir, format="arrow", progress_reporter=reporter)
    keys = [(item.id, item.subset) for item in dataset]

    shuffle(keys)

    for item_id, subset in keys:
        item = dataset.get(item_id, subset)
        img_data = item.media.data

    dt = time() - start
    print(f"dt={dt:.2f}")
    print(memory_profiler.memory_usage()[0])
    print(pa.total_allocated_bytes())

    shutil.move(dst_dir, source_dir)
```

</details>

Signed-off-by: Kim, Vinnam <vinnam.kim@intel.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
BUG Something isn't working data formats PR is related to dataset formats
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants