Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-threading option to ModelTransform and SAMBboxToInstanceMask #1145

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
(<https://github.com/openvinotoolkit/datumaro/pull/1133>)
- Remove deprecates announced to be removed in 1.5.0
(<https://github.com/openvinotoolkit/datumaro/pull/1140>)
- Add multi-threading option to ModelTransform and SAMBboxToInstanceMask
(<https://github.com/openvinotoolkit/datumaro/pull/1145>)

### Bug fixes
- Fix bugs for Tile transform
Expand Down
24 changes: 16 additions & 8 deletions notebooks/18_bbox_to_instance_mask_using_sam.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,20 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we apply `SAMBboxToInstanceMask` to the dataset.\n",
"This transform requires some arguments to execute properly.\n",
"`inference_server_type` is the type of inference server which SAM encoder and decoder are deployed.\n",
"In this example, we launched the OpenVINO™ Model Server instance, thus please choose `InferenceServerType.ovms`.\n",
"The gRPC endpoint address was `localhost:8001`.\n",
"Therefore, `host=\"localhost\"`, `port=8001`, and `protocol_type=ProtocolType.grpc` should be given.\n",
"Lastly, you can make `Polygon` output for the instance mask, but this time we assign `to_polygon` to `False`,\n",
"so that the output will be `Mask` annotation type."
"Now, we apply `SAMBboxToInstanceMask` to the dataset. This transform requires several arguments to execute properly.\n",
"\n",
"`inference_server_type` represents the type of inference server on which SAM encoder and decoder are deployed. In this example, we launched the OpenVINO™ Model Server instance. Therefore, please select `InferenceServerType.ovms`.\n",
"\n",
"The gRPC endpoint address was `localhost:8001`. To configure this, provide the following parameters:\n",
"- `host=\"localhost\"`\n",
"- `port=8001`\n",
"- `protocol_type=ProtocolType.grpc`\n",
"\n",
"You can also specify a `timeout=60.0` value, which represents the maximum seconds to wait for a response from the server instance.\n",
"\n",
"Additionally, you can choose to produce `Polygon` output for the instance mask. However, in this case, we have set `to_polygon` to `False`, resulting in an output of the `Mask` annotation type.\n",
"\n",
"Lastly, we've set `num_workers=0`. This means we will use synchronous iteration to send a model inference request to the server instance and wait for the inference results. If you need to handle multiple inference requests concurrently, you can increase this value to utilize a thread pool. This is particularly useful when dealing with server instances that have high throughput."
]
},
{
Expand All @@ -178,8 +184,10 @@
" inference_server_type=InferenceServerType.ovms,\n",
" host=\"localhost\",\n",
" port=8001,\n",
" timeout=60.0,\n",
" protocol_type=ProtocolType.grpc,\n",
" to_polygon=False,\n",
" num_workers=0,\n",
" )"
]
},
Expand Down
10 changes: 9 additions & 1 deletion src/datumaro/components/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ def run_model(
*,
batch_size: int = 1,
append_annotation: bool = False,
num_workers: int = 0,
**kwargs,
) -> Dataset:
"""
Expand All @@ -454,6 +455,8 @@ def run_model(
batch_size: The number of dataset items processed
simultaneously by the model
append_annotation: Whether append new annotation to existed annotations
num_workers: The number of worker threads to use for parallel inference.
Set to 0 for single-process mode. Default is 0.
**kwargs: Parameters for the model

Returns: self
Expand All @@ -465,11 +468,16 @@ def run_model(
launcher=model,
batch_size=batch_size,
append_annotation=append_annotation,
num_workers=num_workers,
**kwargs,
)
elif inspect.isclass(model) and isinstance(model, ModelTransform):
return self.transform(
model, batch_size=batch_size, append_annotation=append_annotation, **kwargs
model,
batch_size=batch_size,
append_annotation=append_annotation,
num_workers=num_workers,
**kwargs,
)
else:
raise TypeError("Unexpected 'model' argument type: %s" % type(model))
Expand Down
11 changes: 10 additions & 1 deletion src/datumaro/components/hl_ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def run_model(
*,
batch_size: int = 1,
append_annotation: bool = False,
num_workers: int = 0,
**kwargs,
) -> IDataset:
"""
Expand All @@ -207,6 +208,8 @@ def run_model(
batch_size: The number of dataset items processed
simultaneously by the model
append_annotation: Whether append new annotation to existed annotations
num_workers: The number of worker threads to use for parallel inference.
Set to 0 for single-process mode. Default is 0.
**kwargs: Parameters for the model

Returns: a wrapper around the input dataset, which is computed lazily
Expand All @@ -220,11 +223,17 @@ def run_model(
launcher=model,
batch_size=batch_size,
append_annotation=append_annotation,
num_workers=num_workers,
**kwargs,
)
elif inspect.isclass(model) and issubclass(model, ModelTransform):
return HLOps.transform(
dataset, model, batch_size=batch_size, append_annotation=append_annotation, **kwargs
dataset,
model,
batch_size=batch_size,
append_annotation=append_annotation,
num_workers=num_workers,
**kwargs,
)
else:
raise TypeError(f"Unexpected model argument type: {type(model)}")
Expand Down
84 changes: 69 additions & 15 deletions src/datumaro/components/transformer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright (C) 2019-2022 Intel Corporation
#
# SPDX-License-Identifier: MIT
from typing import Generator, List, Optional
from multiprocessing.pool import ThreadPool
from typing import Iterator, List, Optional

import numpy as np

Expand All @@ -10,6 +11,7 @@
from datumaro.components.dataset_base import DatasetBase, DatasetItem, IDataset
from datumaro.components.launcher import Launcher
from datumaro.util import is_method_redefined, take_by
from datumaro.util.multi_procs_util import consumer_generator


class Transform(DatasetBase, CliPlugin):
Expand Down Expand Up @@ -71,35 +73,87 @@


class ModelTransform(Transform):
"""A transformation class for applying a model's inference to dataset items.

This class takes an dataset, a launcher, and other optional parameters
to transform the dataset item from the model outputs by the launcher.
It can process items using multiple processes if specified, making it suitable for
parallelized inference tasks.

Parameters:
extractor: The dataset extractor to obtain items from.
launcher: The launcher responsible for model inference.
batch_size: The batch size for processing items. Default is 1.
append_annotation: Whether to append inference annotations to existing annotations.
Default is False.
num_workers: The number of worker threads to use for parallel inference.
Set to 0 for single-process mode. Default is 0.
"""

def __init__(
self,
extractor: IDataset,
launcher: Launcher,
batch_size: int = 1,
append_annotation: bool = False,
num_workers: int = 0,
):
super().__init__(extractor)
self._launcher = launcher
self._batch_size = batch_size
self._append_annotation = append_annotation

def __iter__(self) -> Generator[DatasetItem, None, None]:
for batch in take_by(self._extractor, self._batch_size):
inference = self._launcher.launch(
[item for item in batch if self._launcher.type_check(item)]
if not (isinstance(num_workers, int) and num_workers >= 0):
raise ValueError(

Check warning on line 106 in src/datumaro/components/transformer.py

View check run for this annotation

Codecov / codecov/patch

src/datumaro/components/transformer.py#L106

Added line #L106 was not covered by tests
f"num_workers should be a non negative integer, but it is {num_workers}"
)

for item in self._yield_item(batch, inference):
self._num_workers = num_workers

def __iter__(self) -> Iterator[DatasetItem]:
if self._num_workers == 0:
return self._iter_single_proc()
return self._iter_multi_procs()

def _iter_multi_procs(self):
with ThreadPool(processes=self._num_workers) as pool:

def _producer_gen():
for batch in take_by(self._extractor, self._batch_size):
future = pool.apply_async(
func=self._process_batch,
args=(batch,),
)
yield future

with consumer_generator(producer_generator=_producer_gen) as consumer_gen:
for future in consumer_gen():
for item in future.get():
yield item
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved

def _iter_single_proc(self) -> Iterator[DatasetItem]:
for batch in take_by(self._extractor, self._batch_size):
for item in self._process_batch(batch=batch):
yield item

def _yield_item(
self, batch: List[DatasetItem], inference: List[List[Annotation]]
) -> Generator[DatasetItem, None, None]:
for item, annotations in zip(batch, inference):
def _process_batch(
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved
self,
batch: List[DatasetItem],
) -> List[DatasetItem]:
inference = self._launcher.launch(
batch=[item for item in batch if self._launcher.type_check(item)]
)

for annotations in inference:
self._check_annotations(annotations)
if self._append_annotation:
annotations = item.annotations + annotations
yield self.wrap_item(item, annotations=annotations)

return [
self.wrap_item(
item,
annotations=item.annotations + annotations
if self._append_annotation
else annotations,
)
for item, annotations in zip(batch, inference)
]
sooahleex marked this conversation as resolved.
Show resolved Hide resolved

def get_subset(self, name):
subset = self._extractor.get_subset(name)
Expand Down
21 changes: 15 additions & 6 deletions src/datumaro/plugins/missing_annotation_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: MIT

from typing import Generator, List, Optional, Set
from typing import List, Optional, Set

from datumaro.components.abstracts.merger import IMatcherContext
from datumaro.components.annotation import Annotation, AnnotationType, LabelCategories
Expand Down Expand Up @@ -83,18 +83,27 @@ def get_any_label_name(self, ann: Annotation, label_id: int) -> str:
),
}

def _yield_item(
self, batch: List[DatasetItem], inference: List[List[Annotation]]
) -> Generator[DatasetItem, None, None]:
for item, annotations in zip(batch, inference):
def _process_batch(
self,
batch: List[DatasetItem],
) -> List[DatasetItem]:
inference = self._launcher.launch(
batch=[item for item in batch if self._launcher.type_check(item)]
)

for annotations in inference:
self._check_annotations(annotations)
yield self.wrap_item(

return [
self.wrap_item(
item,
annotations=self._find_missing_anns(
gt_anns=item.annotations,
pseudo_anns=self._apply_score_threshold(annotations),
),
)
for item, annotations in zip(batch, inference)
]

def _apply_score_threshold(self, annotations: List[Annotation]) -> List[Annotation]:
if self._score_threshold is None:
Expand Down
37 changes: 24 additions & 13 deletions src/datumaro/plugins/sam_transforms/bbox_to_inst_mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""Bbox-to-instance mask transform using Segment Anything Model"""

import os.path as osp
from typing import Generator, List, Optional
from typing import List, Optional

import datumaro.plugins.sam_transforms.interpreters.sam_decoder_for_bbox as sam_decoder_for_bbox_interp
import datumaro.plugins.sam_transforms.interpreters.sam_encoder as sam_encoder_interp
Expand All @@ -18,7 +18,6 @@
ProtocolType,
TLSConfig,
)
from datumaro.util import take_by
from datumaro.util.mask_tools import extract_contours

__all__ = ["SAMBboxToInstanceMask"]
Expand All @@ -44,6 +43,8 @@ class SAMBboxToInstanceMask(ModelTransform, CliPlugin):
tls_config: Configuration required if the server instance is in the secure mode
protocol_type: Communication protocol type with the server instance
to_polygon: If true, the output `Mask` annotations will be converted to `Polygon` annotations.
num_workers: The number of worker threads to use for parallel inference.
Set to 0 for single-process mode. Default is 0.
"""

def __init__(
Expand All @@ -56,6 +57,7 @@ def __init__(
tls_config: Optional[TLSConfig] = None,
protocol_type: ProtocolType = ProtocolType.grpc,
to_polygon: bool = False,
num_workers: int = 0,
):
if inference_server_type == InferenceServerType.ovms:
launcher_cls = OVMSLauncher
Expand Down Expand Up @@ -90,26 +92,35 @@ def __init__(
launcher=self._sam_encoder_launcher,
batch_size=1,
append_annotation=False,
num_workers=num_workers,
)
self._to_polygon = to_polygon

def __iter__(self) -> Generator[DatasetItem, None, None]:
for batch in take_by(self._extractor, self._batch_size):
batch = [item for item in batch if self._launcher.type_check(item)]
img_embeds = self._sam_encoder_launcher.launch(batch)
def _process_batch(
self,
batch: List[DatasetItem],
) -> List[DatasetItem]:
print("Process batch")
img_embeds = self._sam_encoder_launcher.launch(
batch=[item for item in batch if self._sam_encoder_launcher.type_check(item)]
)

for item, img_embed in zip(batch, img_embeds):
# Nested list of mask [[mask_0, ...]]
nested_masks: List[List[Mask]] = self._sam_decoder_launcher.launch(
[item.wrap(annotations=item.annotations + img_embed)],
stack=False,
)
items = []
for item, img_embed in zip(batch, img_embeds):
# Nested list of mask [[mask_0, ...]]
nested_masks: List[List[Mask]] = self._sam_decoder_launcher.launch(
[item.wrap(annotations=item.annotations + img_embed)],
stack=False,
)

yield item.wrap(
items.append(
item.wrap(
annotations=self._convert_to_polygon(nested_masks[0])
if self._to_polygon
else nested_masks[0]
)
)
sooahleex marked this conversation as resolved.
Show resolved Hide resolved
return items

@staticmethod
def _convert_to_polygon(masks: List[Mask]):
Expand Down
Loading
Loading