1
- """Model Wrapper of OTX Visual Prompting."""
1
+ """Openvino Model Wrappers of OTX Visual Prompting."""
2
2
3
3
# Copyright (C) 2023 Intel Corporation
4
4
#
14
14
# See the License for the specific language governing permissions
15
15
# and limitations under the License.
16
16
17
- from typing import Any , Dict , Tuple
17
+ from copy import deepcopy
18
+ from typing import Any , Dict , List , Optional , Tuple , Union
18
19
19
20
import cv2
20
21
import numpy as np
21
- from openvino .model_api .models import ImageModel
22
- from openvino .model_api .models .types import NumericalValue
22
+ from openvino .model_api .adapters .inference_adapter import InferenceAdapter
23
+ from openvino .model_api .models import ImageModel , SegmentationModel
24
+ from openvino .model_api .models .types import NumericalValue , StringValue
23
25
24
- from otx .algorithms .segmentation .adapters .openvino .model_wrappers .blur import (
25
- BlurSegmentation ,
26
- )
27
26
from otx .api .utils .segmentation_utils import create_hard_prediction_from_soft_prediction
28
27
29
28
@@ -32,63 +31,93 @@ class ImageEncoder(ImageModel):
32
31
33
32
__model__ = "image_encoder"
34
33
34
+ def __init__ (self , inference_adapter , configuration = None , preload = False ):
35
+ super ().__init__ (inference_adapter , configuration , preload )
36
+
35
37
@classmethod
36
38
def parameters (cls ) -> Dict [str , Any ]: # noqa: D102
37
39
parameters = super ().parameters ()
38
- parameters ["resize_type" ].default_value = "fit_to_window"
39
- parameters ["mean_values" ].default_value = [123.675 , 116.28 , 103.53 ]
40
- parameters ["scale_values" ].default_value = [58.395 , 57.12 , 57.375 ]
40
+ parameters .update (
41
+ {
42
+ "resize_type" : StringValue (default_value = "fit_to_window" ),
43
+ }
44
+ )
41
45
return parameters
42
46
47
+ def preprocess (self , inputs : np .ndarray ) -> Tuple [Dict [str , np .ndarray ], Dict [str , Any ]]:
48
+ """Update meta for image encoder."""
49
+ dict_inputs , meta = super ().preprocess (inputs )
50
+ meta ["resize_type" ] = self .resize_type
51
+ return dict_inputs , meta
43
52
44
- class Decoder (BlurSegmentation ):
45
- """Decoder class for visual prompting of openvino model wrapper.
46
53
47
- TODO (sungchul): change parent class
48
- """
54
+ class Decoder ( SegmentationModel ):
55
+ """Decoder class for visual prompting of openvino model wrapper."""
49
56
50
57
__model__ = "decoder"
51
58
52
- def preprocess (self , bbox : np .ndarray , original_size : Tuple [int ]) -> Dict [str , Any ]:
53
- """Ready decoder inputs."""
54
- point_coords = bbox .reshape ((- 1 , 2 , 2 ))
55
- point_labels = np .array ([2 , 3 ], dtype = np .float32 ).reshape ((- 1 , 2 ))
56
- inputs_decoder = {
57
- "point_coords" : point_coords ,
58
- "point_labels" : point_labels ,
59
- # TODO (sungchul): how to generate mask_input and has_mask_input
60
- "mask_input" : np .zeros ((1 , 1 , 256 , 256 ), dtype = np .float32 ),
61
- "has_mask_input" : np .zeros ((1 , 1 ), dtype = np .float32 ),
62
- "orig_size" : np .array (original_size , dtype = np .float32 ).reshape ((- 1 , 2 )),
63
- }
64
- return inputs_decoder
59
+ def __init__ (
60
+ self ,
61
+ model_adapter : InferenceAdapter ,
62
+ configuration : Optional [dict ] = None ,
63
+ preload : bool = False ,
64
+ ):
65
+ super ().__init__ (model_adapter , configuration , preload )
66
+ self .output_blob_name = "low_res_masks"
65
67
66
68
@classmethod
67
69
def parameters (cls ): # noqa: D102
68
70
parameters = super ().parameters ()
69
71
parameters .update ({"image_size" : NumericalValue (value_type = int , default_value = 1024 , min = 0 , max = 2048 )})
70
72
return parameters
71
73
74
+ def preprocess (self , inputs : Dict [str , Any ], meta : Dict [str , Any ]):
75
+ """Preprocess prompts."""
76
+ processed_prompts = []
77
+ # TODO (sungchul): process points
78
+ for bbox , label in zip (inputs ["bboxes" ], inputs ["labels" ]):
79
+ # TODO (sungchul): add condition to check whether using bbox or point
80
+ point_coords = self ._apply_coords (bbox .reshape (- 1 , 2 , 2 ), inputs ["original_size" ])
81
+ point_labels = np .array ([2 , 3 ], dtype = np .float32 ).reshape ((- 1 , 2 ))
82
+ processed_prompts .append (
83
+ {
84
+ "point_coords" : point_coords ,
85
+ "point_labels" : point_labels ,
86
+ # TODO (sungchul): how to generate mask_input and has_mask_input
87
+ "mask_input" : np .zeros ((1 , 1 , 256 , 256 ), dtype = np .float32 ),
88
+ "has_mask_input" : np .zeros ((1 , 1 ), dtype = np .float32 ),
89
+ "orig_size" : np .array (inputs ["original_size" ], dtype = np .float32 ).reshape ((- 1 , 2 )),
90
+ "label" : label ,
91
+ }
92
+ )
93
+ return processed_prompts
94
+
95
+ def _apply_coords (self , coords : np .ndarray , original_size : Union [List [int ], Tuple [int , int ]]) -> np .ndarray :
96
+ """Process coords according to preprocessed image size using image meta."""
97
+ old_h , old_w = original_size
98
+ new_h , new_w = self ._get_preprocess_shape (original_size [0 ], original_size [1 ], self .image_size )
99
+ coords = deepcopy (coords ).astype (np .float32 )
100
+ coords [..., 0 ] = coords [..., 0 ] * (new_w / old_w )
101
+ coords [..., 1 ] = coords [..., 1 ] * (new_h / old_h )
102
+ return coords
103
+
104
+ def _get_preprocess_shape (self , old_h : int , old_w : int , image_size : int ) -> Tuple [int , int ]:
105
+ """Compute the output size given input size and target image size."""
106
+ scale = image_size / max (old_h , old_w )
107
+ new_h , new_w = old_h * scale , old_w * scale
108
+ new_w = int (new_w + 0.5 )
109
+ new_h = int (new_h + 0.5 )
110
+ return (new_h , new_w )
111
+
112
+ def _check_io_number (self , number_of_inputs , number_of_outputs ):
113
+ pass
114
+
72
115
def _get_inputs (self ):
73
116
"""Get input layer name and shape."""
74
117
image_blob_names = [name for name in self .inputs .keys ()]
75
118
image_info_blob_names = []
76
119
return image_blob_names , image_info_blob_names
77
120
78
- def _get_outputs (self ):
79
- """Get output layer name and shape."""
80
- layer_name = "low_res_masks"
81
- layer_shape = self .outputs [layer_name ].shape
82
-
83
- if len (layer_shape ) == 3 :
84
- self .out_channels = 0
85
- elif len (layer_shape ) == 4 :
86
- self .out_channels = layer_shape [1 ]
87
- else :
88
- raise Exception (f"Unexpected output layer shape { layer_shape } . Only 4D and 3D output layers are supported" )
89
-
90
- return layer_name
91
-
92
121
def postprocess (self , outputs : Dict [str , np .ndarray ], meta : Dict [str , Any ]) -> Tuple [np .ndarray , np .ndarray ]:
93
122
"""Postprocess to convert soft prediction to hard prediction.
94
123
@@ -102,10 +131,10 @@ def postprocess(self, outputs: Dict[str, np.ndarray], meta: Dict[str, Any]) -> T
102
131
"""
103
132
104
133
def sigmoid (x ):
105
- return 1 / ( 1 + np . exp ( - x ))
134
+ return np . tanh ( x * 0.5 ) * 0.5 + 0.5 # to avoid overflow
106
135
107
136
soft_prediction = outputs [self .output_blob_name ].squeeze ()
108
- soft_prediction = self .resize_and_crop (soft_prediction , meta ["original_size" ])
137
+ soft_prediction = self .resize_and_crop (soft_prediction , meta ["original_size" ][ 0 ] )
109
138
soft_prediction = sigmoid (soft_prediction )
110
139
meta ["soft_prediction" ] = soft_prediction
111
140
@@ -134,18 +163,18 @@ def resize_and_crop(self, soft_prediction: np.ndarray, original_size: np.ndarray
134
163
soft_prediction , (self .image_size , self .image_size ), 0 , 0 , interpolation = cv2 .INTER_LINEAR
135
164
)
136
165
137
- prepadded_size = self .resize_longest_image_size (original_size , self .image_size ).astype (np .int64 )
166
+ prepadded_size = self .get_padded_size (original_size , self .image_size ).astype (np .int64 )
138
167
resized_cropped_soft_prediction = resized_soft_prediction [..., : prepadded_size [0 ], : prepadded_size [1 ]]
139
168
140
169
original_size = original_size .astype (np .int64 )
141
- h , w = original_size [ 0 ], original_size [ 1 ]
170
+ h , w = original_size
142
171
final_soft_prediction = cv2 .resize (
143
172
resized_cropped_soft_prediction , (w , h ), 0 , 0 , interpolation = cv2 .INTER_LINEAR
144
173
)
145
174
return final_soft_prediction
146
175
147
- def resize_longest_image_size (self , original_size : np .ndarray , longest_side : int ) -> np .ndarray :
148
- """Resizes the longest side of the image to the given size .
176
+ def get_padded_size (self , original_size : np .ndarray , longest_side : int ) -> np .ndarray :
177
+ """Get padded size from original size and longest side of the image.
149
178
150
179
Args:
151
180
original_size (np.ndarray): The original image size with shape Bx2.
0 commit comments