Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add types to qos_overriding_options.py #1248

Merged
merged 3 commits into from
Mar 24, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions rclpy/rclpy/qos_overriding_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

from typing import Callable
from typing import Iterable
from typing import List
from typing import Optional
from typing import Text
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeAlias
from typing import Union

from rcl_interfaces.msg import ParameterDescriptor
Expand All @@ -28,8 +30,12 @@
from rclpy.exceptions import ParameterAlreadyDeclaredException
from rclpy.parameter import Parameter
from rclpy.publisher import Publisher
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSLivelinessPolicy
from rclpy.qos import QoSPolicyKind
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
from rclpy.subscription import Subscription

if TYPE_CHECKING:
Expand All @@ -41,7 +47,7 @@ class InvalidQosOverridesError(Exception):


# Return type of qos validation callbacks
QosCallbackResult = SetParametersResult
QosCallbackResult: TypeAlias = SetParametersResult
# Qos callback type annotation
QosCallbackType = Callable[[QoSProfile], QosCallbackResult]

Expand Down Expand Up @@ -103,7 +109,7 @@ def _declare_qos_parameters(
topic_name: Text,
qos: QoSProfile,
options: QoSOverridingOptions
) -> QoSProfile:
) -> None:
"""
Declare QoS parameters for a Publisher or a Subscription.

Expand Down Expand Up @@ -143,37 +149,42 @@ def _declare_qos_parameters(
f"{description.format('Provided QoS overrides')}, are not valid: {result.reason}")


def _get_allowed_policies(entity_type: Union[Type[Publisher], Type[Subscription]]):
def _get_allowed_policies(entity_type: Union[Type[Publisher],
Type[Subscription]]) -> List[QoSPolicyKind]:
allowed_policies = list(QoSPolicyKind.__members__.values())
if issubclass(entity_type, Subscription):
allowed_policies.remove(QoSPolicyKind.LIFESPAN)
return allowed_policies


QoSProfileAttributes = Union[QoSHistoryPolicy, int, QoSReliabilityPolicy, QoSDurabilityPolicy,
Duration, QoSLivelinessPolicy, bool]


def _get_qos_policy_parameter(qos: QoSProfile, policy: QoSPolicyKind) -> Union[str, int, bool]:
value = getattr(qos, policy.name.lower())
if policy in (
QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY,
QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY
):
value = value.name.lower()
if value == 'unknown':
value: QoSProfileAttributes = getattr(qos, policy.name.lower())
if isinstance(value, (QoSHistoryPolicy, QoSReliabilityPolicy,
QoSDurabilityPolicy, QoSLivelinessPolicy)):
return_value: Union[str, int, bool] = value.name.lower()
if return_value == 'unknown':
raise ValueError('User provided QoS profile is invalid')
if policy in (
QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION
):
value = value.nanoseconds
return value
elif isinstance(value, Duration):
return_value = value.nanoseconds
else:
return_value = value
return return_value


def _override_qos_policy_with_param(qos: QoSProfile, policy: QoSPolicyKind, param: Parameter):
def _override_qos_policy_with_param(qos: QoSProfile,
policy: QoSPolicyKind,
param: Parameter) -> None:
value = param.value
policy_name = policy.name.lower()
if policy in (
QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY,
QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY
):
def capitalize_first_letter(x):
def capitalize_first_letter(x: str) -> str:
return x[0].upper() + x[1:]
# e.g. `policy=QosPolicyKind.LIVELINESS` -> `policy_enum_class=rclpy.qos.LivelinessPolicy`
policy_enum_class = getattr(
Expand Down