From 0bca600e77e21d1bfa8900ae3fbef8c79d315b48 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom <36806982+InvincibleRMC@users.noreply.github.com> Date: Sun, 24 Mar 2024 19:57:16 -0400 Subject: [PATCH] Add types to qos_overriding_options.py (#1248) Signed-off-by: Michael Carlstrom Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos_overriding_options.py | 45 +++++++++++++++++---------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/rclpy/rclpy/qos_overriding_options.py b/rclpy/rclpy/qos_overriding_options.py index 316b59433..f64b83d30 100644 --- a/rclpy/rclpy/qos_overriding_options.py +++ b/rclpy/rclpy/qos_overriding_options.py @@ -14,6 +14,7 @@ from typing import Callable from typing import Iterable +from typing import List from typing import Optional from typing import Text from typing import Type @@ -28,11 +29,16 @@ from rclpy.exceptions import ParameterAlreadyDeclaredException from rclpy.parameter import Parameter from rclpy.publisher import Publisher +from rclpy.qos import QoSDurabilityPolicy +from rclpy.qos import QoSHistoryPolicy +from rclpy.qos import QoSLivelinessPolicy from rclpy.qos import QoSPolicyKind from rclpy.qos import QoSProfile +from rclpy.qos import QoSReliabilityPolicy from rclpy.subscription import Subscription if TYPE_CHECKING: + from typing import TypeAlias from rclpy.node import Node @@ -41,7 +47,7 @@ class InvalidQosOverridesError(Exception): # Return type of qos validation callbacks -QosCallbackResult = SetParametersResult +QosCallbackResult: 'TypeAlias' = SetParametersResult # Qos callback type annotation QosCallbackType = Callable[[QoSProfile], QosCallbackResult] @@ -103,7 +109,7 @@ def _declare_qos_parameters( topic_name: Text, qos: QoSProfile, options: QoSOverridingOptions -) -> QoSProfile: +) -> None: """ Declare QoS parameters for a Publisher or a Subscription. @@ -143,37 +149,42 @@ def _declare_qos_parameters( f"{description.format('Provided QoS overrides')}, are not valid: {result.reason}") -def _get_allowed_policies(entity_type: Union[Type[Publisher], Type[Subscription]]): +def _get_allowed_policies(entity_type: Union[Type[Publisher], + Type[Subscription]]) -> List[QoSPolicyKind]: allowed_policies = list(QoSPolicyKind.__members__.values()) if issubclass(entity_type, Subscription): allowed_policies.remove(QoSPolicyKind.LIFESPAN) return allowed_policies +QoSProfileAttributes = Union[QoSHistoryPolicy, int, QoSReliabilityPolicy, QoSDurabilityPolicy, + Duration, QoSLivelinessPolicy, bool] + + def _get_qos_policy_parameter(qos: QoSProfile, policy: QoSPolicyKind) -> Union[str, int, bool]: - value = getattr(qos, policy.name.lower()) - if policy in ( - QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY, - QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY - ): - value = value.name.lower() - if value == 'unknown': + value: QoSProfileAttributes = getattr(qos, policy.name.lower()) + if isinstance(value, (QoSHistoryPolicy, QoSReliabilityPolicy, + QoSDurabilityPolicy, QoSLivelinessPolicy)): + return_value: Union[str, int, bool] = value.name.lower() + if return_value == 'unknown': raise ValueError('User provided QoS profile is invalid') - if policy in ( - QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION - ): - value = value.nanoseconds - return value + elif isinstance(value, Duration): + return_value = value.nanoseconds + else: + return_value = value + return return_value -def _override_qos_policy_with_param(qos: QoSProfile, policy: QoSPolicyKind, param: Parameter): +def _override_qos_policy_with_param(qos: QoSProfile, + policy: QoSPolicyKind, + param: Parameter) -> None: value = param.value policy_name = policy.name.lower() if policy in ( QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY, QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY ): - def capitalize_first_letter(x): + def capitalize_first_letter(x: str) -> str: return x[0].upper() + x[1:] # e.g. `policy=QosPolicyKind.LIVELINESS` -> `policy_enum_class=rclpy.qos.LivelinessPolicy` policy_enum_class = getattr(