Skip to content

Commit

Permalink
Add types to exceptions.py (ros2#1241)
Browse files Browse the repository at this point in the history
* Add types to exception

* Add type checking guard

* Fix NotInitializedException

* Add missing defualt

Signed-off-by: Michael Carlstrom <rmc@carlstrom.com>
  • Loading branch information
InvincibleRMC committed Mar 24, 2024
1 parent 2af516d commit 9189941
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 30 deletions.
68 changes: 38 additions & 30 deletions rclpy/rclpy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,32 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional, TYPE_CHECKING, Union

from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy


class NotInitializedException(Exception):
"""Raised when the rclpy implementation is accessed before rclpy.init()."""

def __init__(self, *args):
Exception.__init__(self, 'rclpy.init() has not been called', *args)
def __init__(self, message: Optional[str] = None) -> None:
if message:
Exception.__init__(self, f'rclpy.init() has not been called. msg:{message}')
else:
Exception.__init__(self, 'rclpy.init() has not been called.')


class NoTypeSupportImportedException(Exception):
"""Raised when there is no type support imported."""

def __init__(self, *args):
def __init__(self) -> None:
Exception.__init__(self, 'no type_support imported')


class NameValidationException(Exception):
"""Raised when a topic name, node name, or namespace are invalid."""

def __init__(self, name_type, name, error_msg, invalid_index, *args):
def __init__(self, name_type: str, name: str, error_msg: str, invalid_index: int) -> None:
msg = """\
Invalid {name_type}: {error_msg}:
'{name}'
Expand All @@ -47,103 +51,107 @@ def __init__(self, name_type, name, error_msg, invalid_index, *args):
class InvalidNamespaceException(NameValidationException):
"""Raised when a namespace is invalid."""

def __init__(self, name, error_msg, invalid_index, *args):
def __init__(self, name: str, error_msg: str, invalid_index: int) -> None:
NameValidationException.__init__(self, 'namespace', name, error_msg, invalid_index)


class InvalidNodeNameException(NameValidationException):
"""Raised when a node name is invalid."""

def __init__(self, name, error_msg, invalid_index, *args):
def __init__(self, name: str, error_msg: str, invalid_index: int) -> None:
NameValidationException.__init__(self, 'node name', name, error_msg, invalid_index)


class InvalidTopicNameException(NameValidationException):
"""Raised when a topic name is invalid."""

def __init__(self, name, error_msg, invalid_index, *args):
def __init__(self, name: str, error_msg: str, invalid_index: int) -> None:
NameValidationException.__init__(self, 'topic name', name, error_msg, invalid_index)


class InvalidServiceNameException(NameValidationException):
"""Raised when a service name is invalid."""

def __init__(self, name, error_msg, invalid_index, *args):
def __init__(self, name: str, error_msg: str, invalid_index: int) -> None:
NameValidationException.__init__(self, 'service name', name, error_msg, invalid_index)


class ParameterException(Exception):
"""Base exception for parameter-related errors."""

def __init__(self, error_msg, parameters, *args):
def __init__(self, error_msg: str, parameters: Union[str, List[str]]) -> None:
Exception.__init__(self, f'{error_msg}: {parameters}')


class ParameterNotDeclaredException(ParameterException):
"""Raised when handling an undeclared parameter when it is not allowed."""

def __init__(self, parameters, *args):
Exception.__init__(self, 'Invalid access to undeclared parameter(s)', parameters, *args)
def __init__(self, parameters: Union[str, List[str]]) -> None:
ParameterException.__init__(self, 'Invalid access to undeclared parameter(s)', parameters)


class ParameterAlreadyDeclaredException(ParameterException):
"""Raised when declaring a parameter that had been declared before."""

def __init__(self, parameters, *args):
Exception.__init__(self, 'Parameter(s) already declared', parameters, *args)
def __init__(self, parameters: Union[str, List[str]]) -> None:
ParameterException.__init__(self, 'Parameter(s) already declared', parameters)


class InvalidParameterException(ParameterException):
"""Raised when a parameter to be declared has an invalid name."""

def __init__(self, parameter, *args):
Exception.__init__(self, 'Invalid parameter name', parameter, *args)
def __init__(self, parameter_name: str):
ParameterException.__init__(self, 'Invalid parameter name', parameter_name)


class InvalidParameterTypeException(ParameterException):
"""Raised when a parameter is rejected for having an invalid type."""

def __init__(self, desired_parameter, expected_type, *args):
from rclpy.parameter import Parameter

def __init__(self, desired_parameter: Parameter, expected_type: Parameter.Type) -> None:
from rclpy.parameter import Parameter
Exception.__init__(
ParameterException.__init__(
self,
f"Trying to set parameter '{desired_parameter._name}' to '{desired_parameter._value}'"
f" of type '{Parameter.Type.from_parameter_value(desired_parameter._value).name}'"
f", expecting type '{expected_type}'",
*args)
self._actual_type = desired_parameter.type_
self._param_name = desired_parameter.name
desired_parameter.name)


class InvalidParameterValueException(ParameterException):
"""Raised when a parameter is rejected by a user callback or when applying a descriptor."""

def __init__(self, parameter, value, reason, *args):
Exception.__init__(
if TYPE_CHECKING:
from rclpy.parameter import AllowableParameterValue

def __init__(self, parameter: str, value: 'AllowableParameterValue', reason: str) -> None:
ParameterException.__init__(
self,
'Invalid parameter value ({value}) for parameter. Reason: {reason}'.format(
value=value, reason=reason), parameter, *args)
value=value, reason=reason), parameter)


class ParameterImmutableException(ParameterException):
"""Raised when a read-only parameter is modified."""

def __init__(self, parameter, *args):
Exception.__init__(self, 'Attempted to modify read-only parameter', parameter, *args)
def __init__(self, parameter_name: str):
ParameterException.__init__(self, 'Attempted to modify read-only parameter',
parameter_name)


class ParameterUninitializedException(ParameterException):
"""Raised when an uninitialized parameter is accessed."""

def __init__(self, parameter_name, *args):
Exception.__init__(
def __init__(self, parameter_name: str):
ParameterException.__init__(
self,
f"The parameter '{parameter_name}' is not initialized",
*args)
parameter_name)


class ROSInterruptException(Exception):
"""Raised when an operation is canceled by rclpy shutting down."""

def __init__(self, *args):
Exception.__init__(self, 'rclpy.shutdown() has been called', *args)
def __init__(self) -> None:
Exception.__init__(self, 'rclpy.shutdown() has been called')
11 changes: 11 additions & 0 deletions rclpy/rclpy/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import TYPE_CHECKING
from typing import Union

from rcl_interfaces.msg import Parameter as ParameterMsg
from rcl_interfaces.msg import ParameterType
Expand All @@ -25,6 +28,14 @@

PARAMETER_SEPARATOR_STRING = '.'

if TYPE_CHECKING:
AllowableParameterValue = Union[None, bool, int, float, str,
List[bytes], Tuple[bytes, ...],
List[bool], Tuple[bool, ...],
List[int], Tuple[int, ...], array.array[int],
List[float], Tuple[float, ...], array.array[float],
List[str], Tuple[str, ...], array.array[str]]


class Parameter:

Expand Down

0 comments on commit 9189941

Please sign in to comment.