diff --git a/apis/python/node/README.md b/apis/python/node/README.md index 5909eebed..fd1d844fe 100644 --- a/apis/python/node/README.md +++ b/apis/python/node/README.md @@ -10,3 +10,12 @@ source .env/bin/activate pip install maturin maturin develop ``` + +## Type hinting + +Type hinting requires to run a second step + +```bash +python dora/generate_stubs.py dora dora/__init__.pyi +maturin develop +``` diff --git a/apis/python/node/dora/__init__.py b/apis/python/node/dora/__init__.py deleted file mode 100644 index be8c50165..000000000 --- a/apis/python/node/dora/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -""" -# dora-rs - -This is the dora python client for interacting with dora dataflow. - -You can install it via: - -```bash -pip install dora-rs -``` -""" - -from enum import Enum - -from .dora import * - -__author__ = "Dora-rs Authors" -__version__ = "0.3.3" - - -class DoraStatus(Enum): - """Dora status to indicate if operator `on_input` loop - should be stopped. - - Args: - Enum (u8): Status signaling to dora operator to - stop or continue the operator. - """ - - CONTINUE = 0 - STOP = 1 - STOP_ALL = 2 diff --git a/apis/python/node/dora/__init__.pyi b/apis/python/node/dora/__init__.pyi new file mode 100644 index 000000000..3014cc351 --- /dev/null +++ b/apis/python/node/dora/__init__.pyi @@ -0,0 +1,316 @@ +import pyarrow +import typing + +@typing.final +class DoraStatus: + + def __eq__(self, value: typing.Any, /) -> bool: + """Return self==value.""" + + def __ge__(self, value: typing.Any, /) -> bool: + """Return self>=value.""" + + def __gt__(self, value: typing.Any, /) -> bool: + """Return self>value.""" + + def __int__(self, /) -> None: + """int(self)""" + + def __le__(self, value: typing.Any, /) -> bool: + """Return self<=value.""" + + def __lt__(self, value: typing.Any, /) -> bool: + """Return self bool: + """Return self!=value.""" + + def __repr__(self, /) -> str: + """Return repr(self).""" + CONTINUE: DoraStatus = ... + STOP: DoraStatus = ... + STOP_ALL: DoraStatus = ... + +@typing.final +class Node: + """The custom node API lets you integrate `dora` into your application. +It allows you to retrieve input and send output in any fashion you want. + +Use with: + +```python +from dora import Node + +node = Node() +```""" + + def __init__(self, /) -> None: + """The custom node API lets you integrate `dora` into your application. +It allows you to retrieve input and send output in any fashion you want. + +Use with: + +```python +from dora import Node + +node = Node() +```""" + + def dataflow_descriptor(self, /) -> dict:... + + def merge_external_events(self, /, subscription: Ros2Subscription) -> None:... + + def next(self, /, timeout: float=None) -> PyEvent: + """`.next()` gives you the next input that the node has received. +It blocks until the next event becomes available. +You can use timeout in seconds to return if no input is available. +It will return `None` when all senders has been dropped. + +```python +event = node.next() +``` + +You can also iterate over the event stream with a loop + +```python +for event in node: +match event["type"]: +case "INPUT": +match event["id"]: +case "image": +```""" + + def send_output(self, /, output_id: str, data: pyarrow.Array, metadata: dict=None) -> None: + """`send_output` send data from the node. + +```python +Args: +output_id: str, +data: Bytes|Arrow, +metadata: Option[Dict], +``` + +```python +node.send_output("string", b"string", {"open_telemetry_context": "7632e76"}) +```""" + + def __iter__(self, /) -> typing.Any: + """Implement iter(self).""" + + def __next__(self, /) -> typing.Any: + """Implement next(self).""" + +@typing.final +class PyEvent: + + def inner(self, /):... + + def __getitem__(self, key: typing.Any, /) -> typing.Any: + """Return self[key].""" + +@typing.final +class Ros2Context: + """ROS2 Context holding all messages definition for receiving and sending messages to ROS2. + +By default, Ros2Context will use env `AMENT_PREFIX_PATH` to search for message definition. + +AMENT_PREFIX_PATH folder structure should be the following: + +- For messages: /msg/.msg +- For services: /srv/.srv + +You can also use `ros_paths` if you don't want to use env variable. + +```python +context = Ros2Context() +``` + +list of paths to search for ROS2 message types defintion""" + + def __init__(self, /, ros_paths: List[str]=None) -> None: + """ROS2 Context holding all messages definition for receiving and sending messages to ROS2. + +By default, Ros2Context will use env `AMENT_PREFIX_PATH` to search for message definition. + +AMENT_PREFIX_PATH folder structure should be the following: + +- For messages: /msg/.msg +- For services: /srv/.srv + +You can also use `ros_paths` if you don't want to use env variable. + +```python +context = Ros2Context() +``` + +list of paths to search for ROS2 message types defintion""" + + def new_node(self, /, name: str, namespace: str, options: Ros2NodeOptions) -> Ros2Node: + """Create a new ROS2 node + +```python +ros2_node = ros2_context.new_node( +"turtle_teleop", +"/ros2_demo", +dora.experimental.ros2_bridge.Ros2NodeOptions(rosout=True), +) +``` + +name of the node +name of the namespace +options for the node""" + +@typing.final +class Ros2Durability: + """DDS 2.2.3.4 DURABILITY""" + + def __eq__(self, value: typing.Any, /) -> bool: + """Return self==value.""" + + def __ge__(self, value: typing.Any, /) -> bool: + """Return self>=value.""" + + def __gt__(self, value: typing.Any, /) -> bool: + """Return self>value.""" + + def __int__(self, /) -> None: + """int(self)""" + + def __le__(self, value: typing.Any, /) -> bool: + """Return self<=value.""" + + def __lt__(self, value: typing.Any, /) -> bool: + """Return self bool: + """Return self!=value.""" + + def __repr__(self, /) -> str: + """Return repr(self).""" + Persistent: Ros2Durability = ... + Transient: Ros2Durability = ... + TransientLocal: Ros2Durability = ... + Volatile: Ros2Durability = ... + +@typing.final +class Ros2Liveliness: + """DDS 2.2.3.11 LIVELINESS""" + + def __eq__(self, value: typing.Any, /) -> bool: + """Return self==value.""" + + def __ge__(self, value: typing.Any, /) -> bool: + """Return self>=value.""" + + def __gt__(self, value: typing.Any, /) -> bool: + """Return self>value.""" + + def __int__(self, /) -> None: + """int(self)""" + + def __le__(self, value: typing.Any, /) -> bool: + """Return self<=value.""" + + def __lt__(self, value: typing.Any, /) -> bool: + """Return self bool: + """Return self!=value.""" + + def __repr__(self, /) -> str: + """Return repr(self).""" + Automatic: Ros2Liveliness = ... + ManualByParticipant: Ros2Liveliness = ... + ManualByTopic: Ros2Liveliness = ... + +@typing.final +class Ros2Node: + """ROS2 Node + +Warnings: +- There's a known issue about ROS2 nodes not being discoverable by ROS2 +See: https://github.com/jhelovuo/ros2-client/issues/4""" + + def create_publisher(self, /, topic: Ros2Topic, qos: Ros2QosPolicies=None) -> Ros2Publisher: + """Create a ROS2 publisher + +```python +pose_publisher = ros2_node.create_publisher(turtle_pose_topic) +``` + +QoS policies for the topic""" + + def create_subscription(self, /, topic: Ros2Topic, qos: Ros2QosPolicies=None) -> Ros2Subscription: + """Create a ROS2 subscription + +```python +pose_reader = ros2_node.create_subscription(turtle_pose_topic) +``` + +QoS policies for the topic""" + + def create_topic(self, /, name: str, message_type: str, qos: Ros2QosPolicies) -> Ros2Topic: + """Create a ROS2 topic to connect to. + +```python +turtle_twist_topic = ros2_node.create_topic( +"/turtle1/cmd_vel", "geometry_msgs/Twist", topic_qos +) +``` + +name of the topic. e.g. "pose" +message type of the topic. e.g. "std_msgs::UInt8MultiArray" +QoS policies for the topic""" + +@typing.final +class Ros2NodeOptions: + """ROS2 Node Options +enable rosout logging""" + + def __init__(self, /, rosout: bool=None) -> None: + """ROS2 Node Options +enable rosout logging""" + +@typing.final +class Ros2Publisher: + """ROS2 Publisher""" + + def publish(self, /, data: pyarrow.Array) -> None: + """Publish a message into ROS2 topic. + +Remember that the data format should respect the structure of the ROS2 message usinng an arrow Structure. + +ex: +```python +gripper_command.publish( +pa.array( +[ +{ +"name": "gripper", +"cmd": np.float32(5), +} +] +), +) +```""" + +@typing.final +class Ros2QosPolicies: + """ROS2 QoS Policy""" + + def __init__(self, /, durability: Ros2Durability=None, liveliness: Ros2Liveliness=None, reliable: bool=None, keep_all: bool=None, lease_duration: float=None, max_blocking_time: float=None, keep_last: int=None) -> Ros2QoSPolicies: + """ROS2 QoS Policy""" + +@typing.final +class Ros2Subscription: + """ROS2 Subscription""" + + def next(self, /):... + +@typing.final +class Ros2Topic: + """ROS2 Topic +enable rosout logging""" + +def start_runtime() -> None: + """Start a runtime for Operators""" \ No newline at end of file diff --git a/apis/python/node/dora/generate_stubs.py b/apis/python/node/dora/generate_stubs.py new file mode 100644 index 000000000..47723489a --- /dev/null +++ b/apis/python/node/dora/generate_stubs.py @@ -0,0 +1,512 @@ +import argparse +import ast +import importlib +import inspect +import logging +import re +import subprocess +from functools import reduce +from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Union + + +def path_to_type(*elements: str) -> ast.AST: + base: ast.AST = ast.Name(id=elements[0], ctx=ast.Load()) + for e in elements[1:]: + base = ast.Attribute(value=base, attr=e, ctx=ast.Load()) + return base + + +OBJECT_MEMBERS = dict(inspect.getmembers(object)) +BUILTINS: Dict[str, Union[None, Tuple[List[ast.AST], ast.AST]]] = { + "__annotations__": None, + "__bool__": ([], path_to_type("bool")), + "__bytes__": ([], path_to_type("bytes")), + "__class__": None, + "__contains__": ([path_to_type("typing", "Any")], path_to_type("bool")), + "__del__": None, + "__delattr__": ([path_to_type("str")], path_to_type("None")), + "__delitem__": ([path_to_type("typing", "Any")], path_to_type("typing", "Any")), + "__dict__": None, + "__dir__": None, + "__doc__": None, + "__eq__": ([path_to_type("typing", "Any")], path_to_type("bool")), + "__format__": ([path_to_type("str")], path_to_type("str")), + "__ge__": ([path_to_type("typing", "Any")], path_to_type("bool")), + "__getattribute__": ([path_to_type("str")], path_to_type("typing", "Any")), + "__getitem__": ([path_to_type("typing", "Any")], path_to_type("typing", "Any")), + "__gt__": ([path_to_type("typing", "Any")], path_to_type("bool")), + "__hash__": ([], path_to_type("int")), + "__init__": ([], path_to_type("None")), + "__init_subclass__": None, + "__iter__": ([], path_to_type("typing", "Any")), + "__le__": ([path_to_type("typing", "Any")], path_to_type("bool")), + "__len__": ([], path_to_type("int")), + "__lt__": ([path_to_type("typing", "Any")], path_to_type("bool")), + "__module__": None, + "__ne__": ([path_to_type("typing", "Any")], path_to_type("bool")), + "__new__": None, + "__next__": ([], path_to_type("typing", "Any")), + "__int__": ([], path_to_type("None")), + "__reduce__": None, + "__reduce_ex__": None, + "__repr__": ([], path_to_type("str")), + "__setattr__": ( + [path_to_type("str"), path_to_type("typing", "Any")], + path_to_type("None"), + ), + "__setitem__": ( + [path_to_type("typing", "Any"), path_to_type("typing", "Any")], + path_to_type("typing", "Any"), + ), + "__sizeof__": None, + "__str__": ([], path_to_type("str")), + "__subclasshook__": None, +} + + +def module_stubs(module: Any) -> ast.Module: + types_to_import = {"typing"} + classes = [] + functions = [] + for member_name, member_value in inspect.getmembers(module): + element_path = [module.__name__, member_name] + if member_name.startswith("__"): + pass + elif inspect.isclass(member_value): + classes.append( + class_stubs(member_name, member_value, element_path, types_to_import) + ) + elif inspect.isbuiltin(member_value): + functions.append( + function_stub( + member_name, + member_value, + element_path, + types_to_import, + in_class=False, + ) + ) + else: + logging.warning(f"Unsupported root construction {member_name}") + return ast.Module( + body=[ast.Import(names=[ast.alias(name=t)]) for t in sorted(types_to_import)] + + classes + + functions, + type_ignores=[], + ) + + +def class_stubs( + cls_name: str, cls_def: Any, element_path: List[str], types_to_import: Set[str] +) -> ast.ClassDef: + attributes: List[ast.AST] = [] + methods: List[ast.AST] = [] + magic_methods: List[ast.AST] = [] + constants: List[ast.AST] = [] + for member_name, member_value in inspect.getmembers(cls_def): + current_element_path = [*element_path, member_name] + if member_name == "__init__": + try: + inspect.signature(cls_def) # we check it actually exists + methods = [ + function_stub( + member_name, + cls_def, + current_element_path, + types_to_import, + in_class=True, + ), + *methods, + ] + except ValueError as e: + if "no signature found" not in str(e): + raise ValueError( + f"Error while parsing signature of {cls_name}.__init_" + ) from e + elif ( + member_value == OBJECT_MEMBERS.get(member_name) + or BUILTINS.get(member_name, ()) is None + ): + pass + elif inspect.isdatadescriptor(member_value): + attributes.extend( + data_descriptor_stub( + member_name, member_value, current_element_path, types_to_import + ) + ) + elif inspect.isroutine(member_value): + (magic_methods if member_name.startswith("__") else methods).append( + function_stub( + member_name, + member_value, + current_element_path, + types_to_import, + in_class=True, + ) + ) + elif member_name == "__match_args__": + constants.append( + ast.AnnAssign( + target=ast.Name(id=member_name, ctx=ast.Store()), + annotation=ast.Subscript( + value=path_to_type("tuple"), + slice=ast.Tuple( + elts=[path_to_type("str"), ast.Ellipsis()], ctx=ast.Load() + ), + ctx=ast.Load(), + ), + value=ast.Constant(member_value), + simple=1, + ) + ) + elif member_value is not None: + constants.append( + ast.AnnAssign( + target=ast.Name(id=member_name, ctx=ast.Store()), + annotation=concatenated_path_to_type( + member_value.__class__.__name__, element_path, types_to_import + ), + value=ast.Ellipsis(), + simple=1, + ) + ) + else: + logging.warning( + f"Unsupported member {member_name} of class {'.'.join(element_path)}" + ) + + doc = inspect.getdoc(cls_def) + doc_comment = build_doc_comment(doc) if doc else None + return ast.ClassDef( + cls_name, + bases=[], + keywords=[], + body=( + ([doc_comment] if doc_comment else []) + + attributes + + methods + + magic_methods + + constants + ) + or [ast.Ellipsis()], + decorator_list=[path_to_type("typing", "final")], + ) + + +def data_descriptor_stub( + data_desc_name: str, + data_desc_def: Any, + element_path: List[str], + types_to_import: Set[str], +) -> Union[Tuple[ast.AnnAssign, ast.Expr], Tuple[ast.AnnAssign]]: + annotation = None + doc_comment = None + + doc = inspect.getdoc(data_desc_def) + if doc is not None: + annotation = returns_stub(data_desc_name, doc, element_path, types_to_import) + m = re.findall(r"^ *:return: *(.*) *$", doc, re.MULTILINE) + if len(m) == 1: + doc_comment = m[0] + elif len(m) > 1: + raise ValueError( + f"Multiple return annotations found with :return: in {'.'.join(element_path)} documentation" + ) + + assign = ast.AnnAssign( + target=ast.Name(id=data_desc_name, ctx=ast.Store()), + annotation=annotation or path_to_type("typing", "Any"), + simple=1, + ) + doc_comment = build_doc_comment(doc_comment) if doc_comment else None + return (assign, doc_comment) if doc_comment else (assign,) + + +def function_stub( + fn_name: str, + fn_def: Any, + element_path: List[str], + types_to_import: Set[str], + *, + in_class: bool, +) -> ast.FunctionDef: + body: List[ast.AST] = [] + doc = inspect.getdoc(fn_def) + if doc is not None: + doc_comment = build_doc_comment(doc) + if doc_comment is not None: + body.append(doc_comment) + + decorator_list = [] + if in_class and hasattr(fn_def, "__self__"): + decorator_list.append(ast.Name("staticmethod")) + + return ast.FunctionDef( + fn_name, + arguments_stub(fn_name, fn_def, doc or "", element_path, types_to_import), + body or [ast.Ellipsis()], + decorator_list=decorator_list, + returns=( + returns_stub(fn_name, doc, element_path, types_to_import) if doc else None + ), + lineno=0, + ) + + +def arguments_stub( + callable_name: str, + callable_def: Any, + doc: str, + element_path: List[str], + types_to_import: Set[str], +) -> ast.arguments: + real_parameters: Mapping[str, inspect.Parameter] = inspect.signature( + callable_def + ).parameters + if callable_name == "__init__": + real_parameters = { + "self": inspect.Parameter("self", inspect.Parameter.POSITIONAL_ONLY), + **real_parameters, + } + + parsed_param_types = {} + optional_params = set() + + # Types for magic functions types + builtin = BUILTINS.get(callable_name) + if isinstance(builtin, tuple): + param_names = list(real_parameters.keys()) + if param_names and param_names[0] == "self": + del param_names[0] + for name, t in zip(param_names, builtin[0]): + parsed_param_types[name] = t + + # Types from comment + for match in re.findall( + r"^ *:type *([a-zA-Z0-9_]+): ([^\n]*) *$", doc, re.MULTILINE + ): + if match[0] not in real_parameters: + raise ValueError( + f"The parameter {match[0]} of {'.'.join(element_path)} " + "is defined in the documentation but not in the function signature" + ) + type = match[1] + if type.endswith(", optional"): + optional_params.add(match[0]) + type = type[:-10] + parsed_param_types[match[0]] = convert_type_from_doc( + type, element_path, types_to_import + ) + + # we parse the parameters + posonlyargs = [] + args = [] + vararg = None + kwonlyargs = [] + kw_defaults = [] + kwarg = None + defaults = [] + for param in real_parameters.values(): + if param.name != "self" and param.name not in parsed_param_types: + raise ValueError( + f"The parameter {param.name} of {'.'.join(element_path)} " + "has no type definition in the function documentation" + ) + param_ast = ast.arg( + arg=param.name, annotation=parsed_param_types.get(param.name) + ) + + default_ast = None + if param.default != param.empty: + default_ast = ast.Constant(param.default) + if param.name not in optional_params: + raise ValueError( + f"Parameter {param.name} of {'.'.join(element_path)} " + "is optional according to the type but not flagged as such in the doc" + ) + elif param.name in optional_params: + raise ValueError( + f"Parameter {param.name} of {'.'.join(element_path)} " + "is optional according to the documentation but has no default value" + ) + + if param.kind == param.POSITIONAL_ONLY: + posonlyargs.append(param_ast) + defaults.append(default_ast) + elif param.kind == param.POSITIONAL_OR_KEYWORD: + args.append(param_ast) + defaults.append(default_ast) + elif param.kind == param.VAR_POSITIONAL: + vararg = param_ast + elif param.kind == param.KEYWORD_ONLY: + kwonlyargs.append(param_ast) + kw_defaults.append(default_ast) + elif param.kind == param.VAR_KEYWORD: + kwarg = param_ast + + return ast.arguments( + posonlyargs=posonlyargs, + args=args, + vararg=vararg, + kwonlyargs=kwonlyargs, + kw_defaults=kw_defaults, + defaults=defaults, + kwarg=kwarg, + ) + + +def returns_stub( + callable_name: str, doc: str, element_path: List[str], types_to_import: Set[str] +) -> Optional[ast.AST]: + m = re.findall(r"^ *:rtype: *([^\n]*) *$", doc, re.MULTILINE) + if len(m) == 0: + builtin = BUILTINS.get(callable_name) + if isinstance(builtin, tuple) and builtin[1] is not None: + return builtin[1] + raise ValueError( + f"The return type of {'.'.join(element_path)} " + "has no type definition using :rtype: in the function documentation" + ) + if len(m) > 1: + raise ValueError( + f"Multiple return type annotations found with :rtype: for {'.'.join(element_path)}" + ) + return convert_type_from_doc(m[0], element_path, types_to_import) + + +def convert_type_from_doc( + type_str: str, element_path: List[str], types_to_import: Set[str] +) -> ast.AST: + type_str = type_str.strip() + return parse_type_to_ast(type_str, element_path, types_to_import) + + +def parse_type_to_ast( + type_str: str, element_path: List[str], types_to_import: Set[str] +) -> ast.AST: + # let's tokenize + tokens = [] + current_token = "" + for c in type_str: + if "a" <= c <= "z" or "A" <= c <= "Z" or c == ".": + current_token += c + else: + if current_token: + tokens.append(current_token) + current_token = "" + if c != " ": + tokens.append(c) + if current_token: + tokens.append(current_token) + + # let's first parse nested parenthesis + stack: List[List[Any]] = [[]] + for token in tokens: + if token == "[": + children: List[str] = [] + stack[-1].append(children) + stack.append(children) + elif token == "]": + stack.pop() + else: + stack[-1].append(token) + + # then it's easy + def parse_sequence(sequence: List[Any]) -> ast.AST: + # we split based on "or" + or_groups: List[List[str]] = [[]] + print(sequence) + # TODO: Fix sequence + if "Ros" in sequence and "2" in sequence: + sequence = ["".join(sequence)] + + for e in sequence: + if e == "or": + or_groups.append([]) + else: + or_groups[-1].append(e) + if any(not g for g in or_groups): + raise ValueError( + f"Not able to parse type '{type_str}' used by {'.'.join(element_path)}" + ) + + new_elements: List[ast.AST] = [] + for group in or_groups: + if len(group) == 1 and isinstance(group[0], str): + new_elements.append( + concatenated_path_to_type(group[0], element_path, types_to_import) + ) + elif ( + len(group) == 2 + and isinstance(group[0], str) + and isinstance(group[1], list) + ): + new_elements.append( + ast.Subscript( + value=concatenated_path_to_type( + group[0], element_path, types_to_import + ), + slice=parse_sequence(group[1]), + ctx=ast.Load(), + ) + ) + else: + raise ValueError( + f"Not able to parse type '{type_str}' used by {'.'.join(element_path)}" + ) + return reduce( + lambda left, right: ast.BinOp(left=left, op=ast.BitOr(), right=right), + new_elements, + ) + + return parse_sequence(stack[0]) + + +def concatenated_path_to_type( + path: str, element_path: List[str], types_to_import: Set[str] +) -> ast.AST: + parts = path.split(".") + if any(not p for p in parts): + raise ValueError( + f"Not able to parse type '{path}' used by {'.'.join(element_path)}" + ) + if len(parts) > 1: + types_to_import.add(".".join(parts[:-1])) + return path_to_type(*parts) + + +def build_doc_comment(doc: str) -> Optional[ast.Expr]: + lines = [line.strip() for line in doc.split("\n")] + clean_lines = [] + for line in lines: + if line.startswith((":type", ":rtype")): + continue + clean_lines.append(line) + text = "\n".join(clean_lines).strip() + return ast.Expr(value=ast.Constant(text)) if text else None + + +def format_with_ruff(file: str) -> None: + subprocess.check_call(["python", "-m", "ruff", "format", file]) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Extract Python type stub from a python module." + ) + parser.add_argument( + "module_name", help="Name of the Python module for which generate stubs" + ) + parser.add_argument( + "out", + help="Name of the Python stub file to write to", + type=argparse.FileType("wt"), + ) + parser.add_argument( + "--ruff", help="Formats the generated stubs using Ruff", action="store_true" + ) + args = parser.parse_args() + stub_content = ast.unparse(module_stubs(importlib.import_module(args.module_name))) + args.out.write(stub_content) + if args.ruff: + format_with_ruff(args.out.name) diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 0d6714f7b..3bc03ebc0 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -43,6 +43,7 @@ impl Node { /// `.next()` gives you the next input that the node has received. /// It blocks until the next event becomes available. + /// You can use timeout in seconds to return if no input is available. /// It will return `None` when all senders has been dropped. /// /// ```python @@ -58,17 +59,41 @@ impl Node { /// match event["id"]: /// case "image": /// ``` + /// :type timeout: float, optional + /// :rtype: PyEvent #[allow(clippy::should_implement_trait)] pub fn next(&mut self, py: Python, timeout: Option) -> PyResult> { let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32))); Ok(event) } + /// You can iterate over the event stream with a loop + /// + /// ```python + /// for event in node: + /// match event["type"]: + /// case "INPUT": + /// match event["id"]: + /// case "image": + /// ``` + /// + /// :rtype: PyEvent pub fn __next__(&mut self, py: Python) -> PyResult> { let event = py.allow_threads(|| self.events.recv(None)); Ok(event) } + /// You can iterate over the event stream with a loop + /// + /// ```python + /// for event in node: + /// match event["type"]: + /// case "INPUT": + /// match event["id"]: + /// case "image": + /// ``` + /// + /// :rtype: PyEvent fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { slf } @@ -85,7 +110,10 @@ impl Node { /// ```python /// node.send_output("string", b"string", {"open_telemetry_context": "7632e76"}) /// ``` - /// + /// :type output_id: str + /// :type data: pyarrow.Array + /// :type metadata: dict, optional + /// :rtype: None pub fn send_output( &mut self, output_id: String, @@ -113,13 +141,20 @@ impl Node { Ok(()) } - /// Returns the full dataflow descriptor that this node is part of. + // Returns the full dataflow descriptor that this node is part of. + // + // This method returns the parsed dataflow YAML file. /// - /// This method returns the parsed dataflow YAML file. + /// :rtype: dict pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result { pythonize::pythonize(py, self.node.dataflow_descriptor()) } + // Merge an external event stream with dora main loop. + // This currently only work with ROS2. + /// + /// :type subscription: Ros2Subscription + /// :rtype: None pub fn merge_external_events( &mut self, subscription: &mut Ros2Subscription, @@ -197,21 +232,42 @@ impl Node { } /// Start a runtime for Operators +/// +/// :rtype: None #[pyfunction] pub fn start_runtime() -> eyre::Result<()> { dora_runtime::main().wrap_err("Dora Runtime raised an error.") } +/// :rtype: DoraStatus +#[derive(Copy, Clone, Debug, PartialEq)] +#[pyclass] +pub enum DoraStatus { + CONTINUE, + STOP, + STOP_ALL, +} + +impl DoraStatus { + /// :rtype: int + pub fn value(self) -> i32 { + match self { + DoraStatus::CONTINUE => 0, + DoraStatus::STOP => 1, + DoraStatus::STOP_ALL => 2, + } + } +} + #[pymodule] -fn dora(py: Python, m: &PyModule) -> PyResult<()> { +fn dora(_py: Python, m: &PyModule) -> PyResult<()> { + dora_ros2_bridge_python::create_dora_ros2_bridge_module(m)?; m.add_function(wrap_pyfunction!(start_runtime, m)?)?; - m.add_class::().unwrap(); - - let ros2_bridge = PyModule::new(py, "ros2_bridge")?; - dora_ros2_bridge_python::create_dora_ros2_bridge_module(ros2_bridge)?; - let experimental = PyModule::new(py, "experimental")?; - experimental.add_submodule(ros2_bridge)?; - m.add_submodule(experimental)?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.setattr("__version__", env!("CARGO_PKG_VERSION"))?; + m.setattr("__author__", "Dora-rs Authors")?; Ok(()) } diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index f4712e92b..d4c3e475e 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -3,6 +3,7 @@ use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters}; use eyre::{Context, Result}; use pyo3::{exceptions::PyLookupError, prelude::*, types::PyDict}; +// Dora Event #[pyclass] pub struct PyEvent { event: MergedEvent, @@ -11,6 +12,8 @@ pub struct PyEvent { #[pymethods] impl PyEvent { + /// + /// :rtype: PyObject pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult> { if key == "kind" { let kind = match &self.event { @@ -35,7 +38,14 @@ impl PyEvent { }; Ok(value) } - MergedEvent::External(event) => event.call_method1(py, "__getitem__", (key,)).map(Some), + MergedEvent::External(event) => { + let value = match key { + "value" => event, + _ => todo!(), + }; + + Ok(Some(value.clone())) + } } } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 8ebbbfc2d..fefc92a48 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -224,8 +224,9 @@ pub fn run( .map_err(traceback); match status_enum { Ok(status_enum) => { - let status_val = Python::with_gil(|py| status_enum.getattr(py, "value")) - .wrap_err("on_event must have enum return value")?; + let status_val = + Python::with_gil(|py| status_enum.call_method0(py, "value")) + .wrap_err("on_event must have enum return value")?; Python::with_gil(|py| status_val.extract(py)) .wrap_err("on_event has invalid return value") } diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index ad4b7757a..d42a3be85 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -17,6 +17,7 @@ mod validate; mod visualize; pub const SHELL_SOURCE: &str = "shell"; +/// Dataflow description #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct Descriptor { diff --git a/libraries/extensions/ros2-bridge/python/src/lib.rs b/libraries/extensions/ros2-bridge/python/src/lib.rs index a07a7fbe7..a0107b583 100644 --- a/libraries/extensions/ros2-bridge/python/src/lib.rs +++ b/libraries/extensions/ros2-bridge/python/src/lib.rs @@ -23,6 +23,24 @@ use typed::{deserialize::StructDeserializer, TypeInfo, TypedValue}; pub mod qos; pub mod typed; +/// ROS2 Context holding all messages definition for receiving and sending messages to ROS2. +/// +/// By default, Ros2Context will use env `AMENT_PREFIX_PATH` to search for message definition. +/// +/// AMENT_PREFIX_PATH folder structure should be the following: +/// +/// - For messages: /msg/.msg +/// - For services: /srv/.srv +/// +/// You can also use `ros_paths` if you don't want to use env variable. +/// +/// ```python +/// context = Ros2Context() +/// ``` +/// +/// :type ros_paths: List[str], optional +/// list of paths to search for ROS2 message types defintion +/// #[pyclass] pub struct Ros2Context { context: ros2_client::Context, @@ -31,6 +49,7 @@ pub struct Ros2Context { #[pymethods] impl Ros2Context { + /// Create a new context #[new] pub fn new(ros_paths: Option>) -> eyre::Result { let ament_prefix_path = std::env::var("AMENT_PREFIX_PATH"); @@ -72,6 +91,22 @@ impl Ros2Context { } /// Create a new ROS2 node + /// + /// ```python + /// ros2_node = ros2_context.new_node( + /// "turtle_teleop", + /// "/ros2_demo", + /// dora.experimental.ros2_bridge.Ros2NodeOptions(rosout=True), + /// ) + /// ``` + /// + /// :type name: str + /// name of the node + /// :type namespace: str + /// name of the namespace + /// :type options: Ros2NodeOptions + /// options for the node + /// :rtype: Ros2Node pub fn new_node( &self, name: &str, @@ -90,6 +125,12 @@ impl Ros2Context { } } +/// ROS2 Node +/// +/// Warnings: +/// - There's a known issue about ROS2 nodes not being discoverable by ROS2 +/// See: https://github.com/jhelovuo/ros2-client/issues/4 +/// #[pyclass] pub struct Ros2Node { node: ros2_client::Node, @@ -98,6 +139,21 @@ pub struct Ros2Node { #[pymethods] impl Ros2Node { + /// Create a ROS2 topic to connect to. + /// + /// ```python + /// turtle_twist_topic = ros2_node.create_topic( + /// "/turtle1/cmd_vel", "geometry_msgs/Twist", topic_qos + /// ) + /// ``` + /// + /// :type name: str + /// name of the topic. e.g. "pose" + /// :type message_type: str + /// message type of the topic. e.g. "std_msgs::UInt8MultiArray" + /// :type qos: Ros2QosPolicies + /// QoS policies for the topic + /// :rtype: Ros2Topic pub fn create_topic( &self, name: &str, @@ -126,6 +182,16 @@ impl Ros2Node { Ok(Ros2Topic { topic, type_info }) } + /// Create a ROS2 publisher + /// + /// ```python + /// pose_publisher = ros2_node.create_publisher(turtle_pose_topic) + /// ``` + /// + /// :type topic: Ros2Topic + /// :type qos: Ros2QosPolicies, optional + /// QoS policies for the topic + /// :rtype: Ros2Publisher pub fn create_publisher( &mut self, topic: &Ros2Topic, @@ -140,6 +206,16 @@ impl Ros2Node { }) } + /// Create a ROS2 subscription + /// + /// ```python + /// pose_reader = ros2_node.create_subscription(turtle_pose_topic) + /// ``` + /// + /// :type topic: Ros2Topic + /// :type qos: Ros2QosPolicies, optional + /// QoS policies for the topic + /// :rtype: Ros2Subscription pub fn create_subscription( &mut self, topic: &Ros2Topic, @@ -155,6 +231,10 @@ impl Ros2Node { } } +/// ROS2 Node Options +/// :type rosout: bool, optional +/// enable rosout logging +/// #[derive(Debug, Clone, Default)] #[pyclass] #[non_exhaustive] @@ -178,6 +258,10 @@ impl From for ros2_client::NodeOptions { } } +/// ROS2 Topic +/// :type rosout: bool, optional +/// enable rosout logging +/// #[pyclass] #[non_exhaustive] pub struct Ros2Topic { @@ -185,6 +269,7 @@ pub struct Ros2Topic { type_info: TypeInfo<'static>, } +/// ROS2 Publisher #[pyclass] #[non_exhaustive] pub struct Ros2Publisher { @@ -194,6 +279,27 @@ pub struct Ros2Publisher { #[pymethods] impl Ros2Publisher { + /// Publish a message into ROS2 topic. + /// + /// Remember that the data format should respect the structure of the ROS2 message usinng an arrow Structure. + /// + /// ex: + /// ```python + /// gripper_command.publish( + /// pa.array( + /// [ + /// { + /// "name": "gripper", + /// "cmd": np.float32(5), + /// } + /// ] + /// ), + /// ) + /// ``` + /// + /// :type data: pyarrow.Array + /// :rtype: None + /// pub fn publish(&self, data: &PyAny) -> eyre::Result<()> { let pyarrow = PyModule::import(data.py(), "pyarrow")?; @@ -228,6 +334,8 @@ impl Ros2Publisher { } } +/// ROS2 Subscription +/// #[pyclass] #[non_exhaustive] pub struct Ros2Subscription { diff --git a/libraries/extensions/ros2-bridge/python/src/qos.rs b/libraries/extensions/ros2-bridge/python/src/qos.rs index 398cfe5f3..1421dfb5b 100644 --- a/libraries/extensions/ros2-bridge/python/src/qos.rs +++ b/libraries/extensions/ros2-bridge/python/src/qos.rs @@ -1,8 +1,19 @@ use ::dora_ros2_bridge::rustdds::{self, policy}; use pyo3::prelude::{pyclass, pymethods}; +/// ROS2 QoS Policy +/// +/// :type durability: Ros2Durability, optional +/// :type liveliness: Ros2Liveliness, optional +/// :type reliable: bool, optional +/// :type keep_all: bool, optional +/// :type lease_duration: float, optional +/// :type max_blocking_time: float, optional +/// :type keep_last: int, optional +/// :rtype: Ros2QoSPolicies +/// #[derive(Debug, Clone)] -#[pyclass(get_all, set_all)] +#[pyclass] #[non_exhaustive] pub struct Ros2QosPolicies { pub durability: Ros2Durability, @@ -64,6 +75,8 @@ impl From for rustdds::QosPolicies { } /// DDS 2.2.3.4 DURABILITY +/// +/// :rtype: Ros2Durability #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] #[pyclass] pub enum Ros2Durability { @@ -73,7 +86,11 @@ pub enum Ros2Durability { Persistent, } +/// :type value: Ros2Durability +/// :rtype: Ros2Durability impl From for policy::Durability { + /// :type value: Ros2Durability + /// :rtype: Ros2Durability fn from(value: Ros2Durability) -> Self { match value { Ros2Durability::Volatile => policy::Durability::Volatile, @@ -85,6 +102,7 @@ impl From for policy::Durability { } /// DDS 2.2.3.11 LIVELINESS +/// :rtype: Ros2Liveliness #[derive(Copy, Clone, Debug, PartialEq)] #[pyclass] pub enum Ros2Liveliness { @@ -94,6 +112,8 @@ pub enum Ros2Liveliness { } impl Ros2Liveliness { + /// :type lease_duration: float + /// :rtype: Ros2Liveliness fn convert(self, lease_duration: f64) -> policy::Liveliness { let lease_duration = if lease_duration.is_infinite() { rustdds::Duration::INFINITE