diff --git a/elevation_mapping_cupy/config/core/plugin_config.yaml b/elevation_mapping_cupy/config/core/plugin_config.yaml index 7eb62c4c..efb29b27 100644 --- a/elevation_mapping_cupy/config/core/plugin_config.yaml +++ b/elevation_mapping_cupy/config/core/plugin_config.yaml @@ -25,4 +25,14 @@ inpainting: layer_name: "inpaint" extra_params: method: "telea" # telea or ns -# Apply smoothing for inpainted layer \ No newline at end of file +# Apply smoothing for inpainted layer +erosion: + enable: True + fill_nan: False + is_height_layer: False + layer_name: "erosion" + extra_params: + input_layer_name: "traversability" + dilation_size: 3 + iteration_n: 20 + reverse: True \ No newline at end of file diff --git a/elevation_mapping_cupy/config/setups/anymal/anymal_sensor_parameter.yaml b/elevation_mapping_cupy/config/setups/anymal/anymal_sensor_parameter.yaml index a130607f..7f5bec6b 100644 --- a/elevation_mapping_cupy/config/setups/anymal/anymal_sensor_parameter.yaml +++ b/elevation_mapping_cupy/config/setups/anymal/anymal_sensor_parameter.yaml @@ -23,7 +23,7 @@ publishers: fps: 5.0 filtered_elevation_map: - layers: ['inpaint', 'smooth', 'min_filter'] + layers: ['inpaint', 'smooth', 'min_filter', 'erosion'] basic_layers: ['inpaint'] fps: 5.0 diff --git a/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/erosion.py b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/erosion.py new file mode 100644 index 00000000..b78497b4 --- /dev/null +++ b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/erosion.py @@ -0,0 +1,89 @@ +# +# Copyright (c) 2024, Takahiro Miki. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for details. +# +import cv2 as cv +import cupy as cp +import numpy as np + +from typing import List + +from .plugin_manager import PluginBase + + +class Erosion(PluginBase): + """ + This class is used for applying erosion to an elevation map or specific layers within it. + Erosion is a morphological operation that is used to remove small-scale details from a binary image. + + Args: + kernel_size (int): Size of the erosion kernel. Default is 3, which means a 3x3 square kernel. + iterations (int): Number of times erosion is applied. Default is 1. + **kwargs (): Additional keyword arguments. + """ + + def __init__( + self, + input_layer_name="traversability", + kernel_size: int = 3, + iterations: int = 1, + reverse: bool = False, + **kwargs, + ): + super().__init__() + self.input_layer_name = input_layer_name + self.kernel_size = kernel_size + self.iterations = iterations + self.reverse = reverse + + def __call__( + self, + elevation_map: cp.ndarray, + layer_names: List[str], + plugin_layers: cp.ndarray, + plugin_layer_names: List[str], + semantic_map: cp.ndarray, + semantic_layer_names: List[str], + *args, + ) -> cp.ndarray: + """ + Applies erosion to the given elevation map. + + Args: + elevation_map (cupy._core.core.ndarray): The elevation map to be eroded. + layer_names (List[str]): Names of the layers in the elevation map. + plugin_layers (cupy._core.core.ndarray): Layers provided by other plugins. + plugin_layer_names (List[str]): Names of the layers provided by other plugins. + *args (): Additional arguments. + + Returns: + cupy._core.core.ndarray: The eroded elevation map. + """ + # Convert the elevation map to a format suitable for erosion (if necessary) + layer_data = self.get_layer_data( + elevation_map, + layer_names, + plugin_layers, + plugin_layer_names, + semantic_map, + semantic_layer_names, + self.input_layer_name, + ) + layer_np = cp.asnumpy(layer_data) + + # Define the erosion kernel + kernel = np.ones((self.kernel_size, self.kernel_size), np.uint8) + + if self.reverse: + layer_np = 1 - layer_np + # Apply erosion + layer_min = float(layer_np.min()) + layer_max = float(layer_np.max()) + layer_np_normalized = ((layer_np - layer_min) * 255 / (layer_max - layer_min)).astype("uint8") + eroded_map_np = cv.erode(layer_np_normalized, kernel, iterations=self.iterations) + eroded_map_np = eroded_map_np.astype(np.float32) * (layer_max - layer_min) / 255 + layer_min + if self.reverse: + eroded_map_np = 1 - eroded_map_np + + # Convert back to cupy array and return + return cp.asarray(eroded_map_np) diff --git a/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/max_layer_filter.py b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/max_layer_filter.py new file mode 100644 index 00000000..1c4e7853 --- /dev/null +++ b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/max_layer_filter.py @@ -0,0 +1,99 @@ +# +# Copyright (c) 2022, Takahiro Miki. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for details. +# +import cupy as cp +import numpy as np +from typing import List + +from elevation_mapping_cupy.plugins.plugin_manager import PluginBase + + +class MaxLayerFilter(PluginBase): + """Applies a maximum filter to the input layers and updates the traversability map. + This can be used to enhance navigation by identifying traversable areas. + + Args: + cell_n (int): The width and height of the elevation map. + reverse (list): A list of boolean values indicating whether to reverse the filter operation for each layer. Default is [True]. + min_or_max (str): A string indicating whether to apply a minimum or maximum filter. Accepts "min" or "max". Default is "max". + layers (list): List of layers for semantic traversability. Default is ["traversability"]. + thresholds (list): List of thresholds for each layer. If the value is bigger than a threshold, assign 1.0 otherwise 0.0. If it is False, it does not apply. Default is [False]. + **kwargs: Additional keyword arguments. + """ + + def __init__( + self, + cell_n: int = 100, + layers: list = ["traversability"], + reverse: list = [True], + min_or_max: str = "max", + thresholds: list = [False], + default_value: float = 0.0, + **kwargs, + ): + super().__init__() + self.layers = layers + self.reverse = reverse + self.min_or_max = min_or_max + self.thresholds = thresholds + self.default_value = default_value + + def __call__( + self, + elevation_map: cp.ndarray, + layer_names: List[str], + plugin_layers: cp.ndarray, + plugin_layer_names: List[str], + semantic_map: cp.ndarray, + semantic_layer_names: List[str], + *args, + ) -> cp.ndarray: + """ + + Args: + elevation_map (cupy._core.core.ndarray): + layer_names (List[str]): + plugin_layers (cupy._core.core.ndarray): + plugin_layer_names (List[str]): + semantic_map (elevation_mapping_cupy.semantic_map.SemanticMap): + *args (): + + Returns: + cupy._core.core.ndarray: + """ + layers = [] + for it, name in enumerate(self.layers): + layer = self.get_layer_data( + elevation_map, layer_names, plugin_layers, plugin_layer_names, semantic_map, semantic_layer_names, name + ) + if layer is None: + continue + if isinstance(self.default_value, float): + layer = cp.where(layer == 0.0, float(self.default_value), layer) + elif isinstance(self.default_value, str): + default_layer = self.get_layer_data( + elevation_map, + layer_names, + plugin_layers, + plugin_layer_names, + semantic_map, + semantic_layer_names, + self.default_value, + ) + layer = cp.where(layer == 0, default_layer, layer) + if self.reverse[it]: + layer = 1.0 - layer + if isinstance(self.thresholds[it], float): + layer = cp.where(layer > float(self.thresholds[it]), 1, 0) + layers.append(layer) + if len(layers) == 0: + print("No layers are found, returning traversability!") + idx = layer_names.index("traversability") + return elevation_map[idx] + result = cp.stack(layers, axis=0) + if self.min_or_max == "min": + result = cp.min(result, axis=0) + else: + result = cp.max(result, axis=0) + return result diff --git a/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/plugin_manager.py b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/plugin_manager.py index 8ff9fda1..26f81d21 100644 --- a/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/plugin_manager.py +++ b/elevation_mapping_cupy/script/elevation_mapping_cupy/plugins/plugin_manager.py @@ -4,7 +4,7 @@ # from abc import ABC import cupy as cp -from typing import List, Dict +from typing import List, Dict, Optional import importlib import inspect from dataclasses import dataclass @@ -39,6 +39,8 @@ def __call__( layer_names: List[str], plugin_layers: cp.ndarray, plugin_layer_names: List[str], + semantic_map: cp.ndarray, + semantic_layer_names: List[str], *args, **kwargs, ) -> cp.ndarray: @@ -50,6 +52,8 @@ def __call__( layer_names (): plugin_layers (): plugin_layer_names (): + semantic_map (): + semantic_layer_names (): Run your processing here and return the result. layer of elevation_map 0: elevation @@ -64,6 +68,45 @@ def __call__( """ pass + def get_layer_data( + self, + elevation_map: cp.ndarray, + layer_names: List[str], + plugin_layers: cp.ndarray, + plugin_layer_names: List[str], + semantic_map: cp.ndarray, + semantic_layer_names: List[str], + name: str, + ) -> Optional[cp.ndarray]: + """ + Retrieve a copy of the layer data from the elevation, plugin, or semantic maps based on the layer name. + + Args: + elevation_map (cp.ndarray): The elevation map containing various layers. + layer_names (List[str]): A list of names for each layer in the elevation map. + plugin_layers (cp.ndarray): The plugin layers containing additional data. + plugin_layer_names (List[str]): A list of names for each layer in the plugin layers. + semantic_map (cp.ndarray): The semantic map containing various layers. + semantic_layer_names (List[str]): A list of names for each layer in the semantic map. + name (str): The name of the layer to retrieve. + + Returns: + Optional[cp.ndarray]: A copy of the requested layer as a cupy ndarray if found, otherwise None. + """ + if name in layer_names: + idx = layer_names.index(name) + layer = elevation_map[idx].copy() + elif name in plugin_layer_names: + idx = plugin_layer_names.index(name) + layer = plugin_layers[idx].copy() + elif name in semantic_layer_names: + idx = semantic_layer_names.index(name) + layer = semantic_map[idx].copy() + else: + print(f"Could not find layer {name}!") + layer = None + return layer + class PluginManager(object): """ @@ -152,11 +195,22 @@ def update_with_name( self.layers[idx] = self.plugins[idx](elevation_map, layer_names, self.layers, self.layer_names) elif n_param == 7: self.layers[idx] = self.plugins[idx]( - elevation_map, layer_names, self.layers, self.layer_names, semantic_map, semantic_params, + elevation_map, + layer_names, + self.layers, + self.layer_names, + semantic_map, + semantic_params, ) elif n_param == 8: self.layers[idx] = self.plugins[idx]( - elevation_map, layer_names, self.layers, self.layer_names, semantic_map, semantic_params, rotation, + elevation_map, + layer_names, + self.layers, + self.layer_names, + semantic_map, + semantic_params, + rotation, ) else: self.layers[idx] = self.plugins[idx](