From cfb3fc76bd52546acb9ac5e74b001b379a8571df Mon Sep 17 00:00:00 2001 From: Takahiro Date: Sat, 24 Feb 2024 19:36:21 +0100 Subject: [PATCH] Moved get_layer_data to plugin manager base. Created erosion filter. --- .../config/core/plugin_config.yaml | 12 +- .../anymal/anymal_sensor_parameter.yaml | 2 +- .../elevation_mapping_cupy/plugins/erosion.py | 89 +++++++++++++ .../plugins/max_layer_filter.py | 55 ++++---- .../plugins/plugin_manager.py | 60 ++++++++- .../plugins/sqrt_filter.py | 122 ------------------ 6 files changed, 184 insertions(+), 156 deletions(-) create mode 100644 elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/erosion.py delete mode 100644 elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/sqrt_filter.py diff --git a/elevation_mapping_cupy/config/core/plugin_config.yaml b/elevation_mapping_cupy/config/core/plugin_config.yaml index 7eb62c4c..efb29b27 100644 --- a/elevation_mapping_cupy/config/core/plugin_config.yaml +++ b/elevation_mapping_cupy/config/core/plugin_config.yaml @@ -25,4 +25,14 @@ inpainting: layer_name: "inpaint" extra_params: method: "telea" # telea or ns -# Apply smoothing for inpainted layer \ No newline at end of file +# Apply smoothing for inpainted layer +erosion: + enable: True + fill_nan: False + is_height_layer: False + layer_name: "erosion" + extra_params: + input_layer_name: "traversability" + dilation_size: 3 + iteration_n: 20 + reverse: True \ No newline at end of file diff --git a/elevation_mapping_cupy/config/setups/anymal/anymal_sensor_parameter.yaml b/elevation_mapping_cupy/config/setups/anymal/anymal_sensor_parameter.yaml index a130607f..7f5bec6b 100644 --- a/elevation_mapping_cupy/config/setups/anymal/anymal_sensor_parameter.yaml +++ b/elevation_mapping_cupy/config/setups/anymal/anymal_sensor_parameter.yaml @@ -23,7 +23,7 @@ publishers: fps: 5.0 filtered_elevation_map: - layers: ['inpaint', 'smooth', 'min_filter'] + layers: ['inpaint', 'smooth', 'min_filter', 'erosion'] basic_layers: ['inpaint'] fps: 5.0 diff --git a/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/erosion.py b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/erosion.py new file mode 100644 index 00000000..b78497b4 --- /dev/null +++ b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/erosion.py @@ -0,0 +1,89 @@ +# +# Copyright (c) 2024, Takahiro Miki. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for details. +# +import cv2 as cv +import cupy as cp +import numpy as np + +from typing import List + +from .plugin_manager import PluginBase + + +class Erosion(PluginBase): + """ + This class is used for applying erosion to an elevation map or specific layers within it. + Erosion is a morphological operation that is used to remove small-scale details from a binary image. + + Args: + kernel_size (int): Size of the erosion kernel. Default is 3, which means a 3x3 square kernel. + iterations (int): Number of times erosion is applied. Default is 1. + **kwargs (): Additional keyword arguments. + """ + + def __init__( + self, + input_layer_name="traversability", + kernel_size: int = 3, + iterations: int = 1, + reverse: bool = False, + **kwargs, + ): + super().__init__() + self.input_layer_name = input_layer_name + self.kernel_size = kernel_size + self.iterations = iterations + self.reverse = reverse + + def __call__( + self, + elevation_map: cp.ndarray, + layer_names: List[str], + plugin_layers: cp.ndarray, + plugin_layer_names: List[str], + semantic_map: cp.ndarray, + semantic_layer_names: List[str], + *args, + ) -> cp.ndarray: + """ + Applies erosion to the given elevation map. + + Args: + elevation_map (cupy._core.core.ndarray): The elevation map to be eroded. + layer_names (List[str]): Names of the layers in the elevation map. + plugin_layers (cupy._core.core.ndarray): Layers provided by other plugins. + plugin_layer_names (List[str]): Names of the layers provided by other plugins. + *args (): Additional arguments. + + Returns: + cupy._core.core.ndarray: The eroded elevation map. + """ + # Convert the elevation map to a format suitable for erosion (if necessary) + layer_data = self.get_layer_data( + elevation_map, + layer_names, + plugin_layers, + plugin_layer_names, + semantic_map, + semantic_layer_names, + self.input_layer_name, + ) + layer_np = cp.asnumpy(layer_data) + + # Define the erosion kernel + kernel = np.ones((self.kernel_size, self.kernel_size), np.uint8) + + if self.reverse: + layer_np = 1 - layer_np + # Apply erosion + layer_min = float(layer_np.min()) + layer_max = float(layer_np.max()) + layer_np_normalized = ((layer_np - layer_min) * 255 / (layer_max - layer_min)).astype("uint8") + eroded_map_np = cv.erode(layer_np_normalized, kernel, iterations=self.iterations) + eroded_map_np = eroded_map_np.astype(np.float32) * (layer_max - layer_min) / 255 + layer_min + if self.reverse: + eroded_map_np = 1 - eroded_map_np + + # Convert back to cupy array and return + return cp.asarray(eroded_map_np) diff --git a/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/max_layer_filter.py b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/max_layer_filter.py index f6b2fc46..3b3dc3c6 100644 --- a/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/max_layer_filter.py +++ b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/max_layer_filter.py @@ -15,9 +15,10 @@ class MaxLayerFilter(PluginBase): Args: cell_n (int): The width and height of the elevation map. + reverse (list): A list of boolean values indicating whether to reverse the filter operation for each layer. Default is [True]. + min_or_max (str): A string indicating whether to apply a minimum or maximum filter. Accepts "min" or "max". Default is "max". layers (list): List of layers for semantic traversability. Default is ["traversability"]. - thresholds (list): List of thresholds for each layer. Default is [0.5]. - type (list): List of types for each layer. Default is ["traversability"]. + thresholds (list): List of thresholds for each layer. If the value is bigger than a threshold, assign 1.0 otherwise 0.0. If it is False, it does not apply. Default is [False]. **kwargs: Additional keyword arguments. """ @@ -29,7 +30,6 @@ def __init__( min_or_max: str = "max", thresholds: list = [False], default_value: float = 0.0, - apply_sqrt: bool = True, **kwargs, ): super().__init__() @@ -38,31 +38,30 @@ def __init__( self.min_or_max = min_or_max self.thresholds = thresholds self.default_value = default_value - self.apply_sqrt = apply_sqrt - def get_layer_data( - self, - elevation_map, - layer_names, - plugin_layers, - plugin_layer_names, - semantic_map, - semantic_layer_names, - name, - ): - if name in layer_names: - idx = layer_names.index(name) - layer = elevation_map[idx].copy() - elif name in plugin_layer_names: - idx = plugin_layer_names.index(name) - layer = plugin_layers[idx].copy() - elif name in semantic_layer_names: - idx = semantic_layer_names.index(name) - layer = semantic_map[idx].copy() - else: - print(f"Could not find layer {name}!") - layer = None - return layer + # def get_layer_data( + # self, + # elevation_map, + # layer_names, + # plugin_layers, + # plugin_layer_names, + # semantic_map, + # semantic_layer_names, + # name, + # ): + # if name in layer_names: + # idx = layer_names.index(name) + # layer = elevation_map[idx].copy() + # elif name in plugin_layer_names: + # idx = plugin_layer_names.index(name) + # layer = plugin_layers[idx].copy() + # elif name in semantic_layer_names: + # idx = semantic_layer_names.index(name) + # layer = semantic_map[idx].copy() + # else: + # print(f"Could not find layer {name}!") + # layer = None + # return layer def __call__( self, @@ -121,6 +120,4 @@ def __call__( result = cp.min(result, axis=0) else: result = cp.max(result, axis=0) - if self.apply_sqrt: - result = cp.square(result) return result diff --git a/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/plugin_manager.py b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/plugin_manager.py index 8ff9fda1..26f81d21 100644 --- a/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/plugin_manager.py +++ b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/plugin_manager.py @@ -4,7 +4,7 @@ # from abc import ABC import cupy as cp -from typing import List, Dict +from typing import List, Dict, Optional import importlib import inspect from dataclasses import dataclass @@ -39,6 +39,8 @@ def __call__( layer_names: List[str], plugin_layers: cp.ndarray, plugin_layer_names: List[str], + semantic_map: cp.ndarray, + semantic_layer_names: List[str], *args, **kwargs, ) -> cp.ndarray: @@ -50,6 +52,8 @@ def __call__( layer_names (): plugin_layers (): plugin_layer_names (): + semantic_map (): + semantic_layer_names (): Run your processing here and return the result. layer of elevation_map 0: elevation @@ -64,6 +68,45 @@ def __call__( """ pass + def get_layer_data( + self, + elevation_map: cp.ndarray, + layer_names: List[str], + plugin_layers: cp.ndarray, + plugin_layer_names: List[str], + semantic_map: cp.ndarray, + semantic_layer_names: List[str], + name: str, + ) -> Optional[cp.ndarray]: + """ + Retrieve a copy of the layer data from the elevation, plugin, or semantic maps based on the layer name. + + Args: + elevation_map (cp.ndarray): The elevation map containing various layers. + layer_names (List[str]): A list of names for each layer in the elevation map. + plugin_layers (cp.ndarray): The plugin layers containing additional data. + plugin_layer_names (List[str]): A list of names for each layer in the plugin layers. + semantic_map (cp.ndarray): The semantic map containing various layers. + semantic_layer_names (List[str]): A list of names for each layer in the semantic map. + name (str): The name of the layer to retrieve. + + Returns: + Optional[cp.ndarray]: A copy of the requested layer as a cupy ndarray if found, otherwise None. + """ + if name in layer_names: + idx = layer_names.index(name) + layer = elevation_map[idx].copy() + elif name in plugin_layer_names: + idx = plugin_layer_names.index(name) + layer = plugin_layers[idx].copy() + elif name in semantic_layer_names: + idx = semantic_layer_names.index(name) + layer = semantic_map[idx].copy() + else: + print(f"Could not find layer {name}!") + layer = None + return layer + class PluginManager(object): """ @@ -152,11 +195,22 @@ def update_with_name( self.layers[idx] = self.plugins[idx](elevation_map, layer_names, self.layers, self.layer_names) elif n_param == 7: self.layers[idx] = self.plugins[idx]( - elevation_map, layer_names, self.layers, self.layer_names, semantic_map, semantic_params, + elevation_map, + layer_names, + self.layers, + self.layer_names, + semantic_map, + semantic_params, ) elif n_param == 8: self.layers[idx] = self.plugins[idx]( - elevation_map, layer_names, self.layers, self.layer_names, semantic_map, semantic_params, rotation, + elevation_map, + layer_names, + self.layers, + self.layer_names, + semantic_map, + semantic_params, + rotation, ) else: self.layers[idx] = self.plugins[idx]( diff --git a/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/sqrt_filter.py b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/sqrt_filter.py deleted file mode 100644 index 63c6a496..00000000 --- a/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/sqrt_filter.py +++ /dev/null @@ -1,122 +0,0 @@ -# -# Copyright (c) 2022, Takahiro Miki. All rights reserved. -# Licensed under the MIT license. See LICENSE file in the project root for details. -# -import cupy as cp -import numpy as np -from typing import List - -from elevation_mapping_cupy.plugins.plugin_manager import PluginBase - - -class MaxLayerFilter(PluginBase): - """Applies a maximum filter to the input layers and updates the traversability map. - This can be used to enhance navigation by identifying traversable areas. - - Args: - cell_n (int): The width and height of the elevation map. - layers (list): List of layers for semantic traversability. Default is ["traversability"]. - thresholds (list): List of thresholds for each layer. Default is [0.5]. - type (list): List of types for each layer. Default is ["traversability"]. - **kwargs: Additional keyword arguments. - """ - - def __init__( - self, - cell_n: int = 100, - layers: list = ["traversability"], - reverse: list = [True], - min_or_max: str = "max", - thresholds: list = [False], - default_value: float = 0.0, - **kwargs, - ): - super().__init__() - self.layers = layers - self.reverse = reverse - self.min_or_max = min_or_max - self.thresholds = thresholds - self.default_value = default_value - - def get_layer_data( - self, - elevation_map, - layer_names, - plugin_layers, - plugin_layer_names, - semantic_map, - semantic_layer_names, - name, - ): - if name in layer_names: - idx = layer_names.index(name) - layer = elevation_map[idx].copy() - elif name in plugin_layer_names: - idx = plugin_layer_names.index(name) - layer = plugin_layers[idx].copy() - elif name in semantic_layer_names: - idx = semantic_layer_names.index(name) - layer = semantic_map[idx].copy() - else: - print(f"Could not find layer {name}!") - layer = None - return layer - - def __call__( - self, - elevation_map: cp.ndarray, - layer_names: List[str], - plugin_layers: cp.ndarray, - plugin_layer_names: List[str], - semantic_map: cp.ndarray, - semantic_layer_names: List[str], - *args, - ) -> cp.ndarray: - """ - - Args: - elevation_map (cupy._core.core.ndarray): - layer_names (List[str]): - plugin_layers (cupy._core.core.ndarray): - plugin_layer_names (List[str]): - semantic_map (elevation_mapping_cupy.semantic_map.SemanticMap): - *args (): - - Returns: - cupy._core.core.ndarray: - """ - layers = [] - for it, name in enumerate(self.layers): - layer = self.get_layer_data( - elevation_map, layer_names, plugin_layers, plugin_layer_names, semantic_map, semantic_layer_names, name - ) - if layer is None: - continue - if isinstance(self.default_value, float): - layer = cp.where(layer == 0.0, float(self.default_value), layer) - elif isinstance(self.default_value, str): - default_layer = self.get_layer_data( - elevation_map, - layer_names, - plugin_layers, - plugin_layer_names, - semantic_map, - semantic_layer_names, - self.default_value, - ) - layer = cp.where(layer == 0, default_layer, layer) - if self.reverse[it]: - layer = 1.0 - layer - if isinstance(self.thresholds[it], float): - layer = cp.where(layer > float(self.thresholds[it]), 1, 0) - layers.append(layer) - if len(layers) == 0: - print("No layers are found, returning traversability!") - idx = layer_names.index("traversability") - return elevation_map[idx] - result = cp.stack(layers, axis=0) - if self.min_or_max == "min": - result = cp.min(result, axis=0) - else: - result = cp.max(result, axis=0) - return result