From c44e0b3778c4823d15d50cc27173e805156a2b5c Mon Sep 17 00:00:00 2001 From: Alan Greer Date: Thu, 19 Dec 2024 15:53:07 +0000 Subject: [PATCH] Added command handler and command parameter parsing. --- src/fastcs_odin/odin_adapter_controller.py | 83 ++++++++++++++++------ src/fastcs_odin/util.py | 46 +++++++----- 2 files changed, 90 insertions(+), 39 deletions(-) diff --git a/src/fastcs_odin/odin_adapter_controller.py b/src/fastcs_odin/odin_adapter_controller.py index 9fb837a..09509e9 100644 --- a/src/fastcs_odin/odin_adapter_controller.py +++ b/src/fastcs_odin/odin_adapter_controller.py @@ -59,6 +59,27 @@ async def update( logging.error("Update loop failed for %s:\n%s", self.path, e) +@dataclass +class CommandHandler(Handler): + path: str + command: str + + async def put( + self, + controller: "OdinAdapterController", + attr: AttrW[Any], + value: Any, + ) -> None: + try: + logging.debug("Sending command %s to %s", self.command, self.path) + response = await controller.connection.put(self.path, self.command) + match response: + case {"error": error}: + raise AdapterResponseError(error) + except Exception as e: + logging.error("Command put %s = %s failed:\n%s", self.path, value, e) + + T = TypeVar("T") @@ -184,33 +205,49 @@ def _process_parameters(self): def _create_attributes(self): """Create controller ``Attributes`` from ``OdinParameters``.""" for parameter in self.parameters: - if "writeable" in parameter.metadata and parameter.metadata["writeable"]: - attr_class = AttrRW - else: - attr_class = AttrR - - if parameter.metadata["type"] not in types: - logging.warning(f"Could not handle parameter {parameter}") - # this is really something I should handle here - continue - - allowed = ( - parameter.metadata["allowed_values"] - if "allowed_values" in parameter.metadata - else None - ) - if len(parameter.path) >= 2: group = snake_to_pascal(f"{parameter.path[0]}") else: group = None - attr = attr_class( - types[parameter.metadata["type"]], - handler=ParamTreeHandler( - "/".join([self._api_prefix] + parameter.uri), allowed_values=allowed - ), - group=group, - ) + if "command" in parameter.uri and "execute" not in parameter.uri: + command_uri = list(parameter.uri) + command_uri[-1] = "execute" + attr = AttrW( + types["int"], + handler=CommandHandler( + "/".join([self._api_prefix] + command_uri), parameter.name + ), + group=group, + ) + + else: + if ( + "writeable" in parameter.metadata + and parameter.metadata["writeable"] + ): + attr_class = AttrRW + else: + attr_class = AttrR + + if parameter.metadata["type"] not in types: + logging.warning(f"Could not handle parameter {parameter}") + # this is really something I should handle here + continue + + allowed = ( + parameter.metadata["allowed_values"] + if "allowed_values" in parameter.metadata + else None + ) + + attr = attr_class( + types[parameter.metadata["type"]], + handler=ParamTreeHandler( + "/".join([self._api_prefix] + parameter.uri), + allowed_values=allowed, + ), + group=group, + ) setattr(self, parameter.name.replace(".", ""), attr) diff --git a/src/fastcs_odin/util.py b/src/fastcs_odin/util.py index 310e2f1..91f5b78 100644 --- a/src/fastcs_odin/util.py +++ b/src/fastcs_odin/util.py @@ -84,23 +84,37 @@ def _walk_odin_metadata( yield from _walk_odin_metadata(sub_node, sub_node_path) else: # Leaves - if isinstance(node_value, dict) and is_metadata_object(node_value): - yield (node_path, node_value) - elif isinstance(node_value, list): - if "config" in node_path: - # Split list into separate parameters so they can be set - for idx, sub_node_value in enumerate(node_value): - sub_node_path = node_path + [str(idx)] - yield ( - sub_node_path, - infer_metadata(sub_node_value, sub_node_path), - ) - else: - # Convert read-only list to a string for display - yield (node_path, infer_metadata(str(node_value), node_path)) + if "command" in node_path and "allowed" in node_path: + cmds = node_value["value"] + for cmd in cmds: + command_path = list(node_path) + command_path[-1] = cmd + command_value = dict(node_value) + command_value["value"] = cmd + command_value["writeable"] = True + yield (command_path, command_value) else: - # TODO: This won't be needed when all parameters provide metadata - yield (node_path, infer_metadata(node_value, node_path)) + if isinstance(node_value, dict) and is_metadata_object(node_value): + yield (node_path, node_value) + elif isinstance(node_value, list): + if "config" in node_path: + # Split list into separate parameters so they can be set + for idx, sub_node_value in enumerate(node_value): + sub_node_path = node_path + [str(idx)] + yield ( + sub_node_path, + infer_metadata(sub_node_value, sub_node_path), + ) + elif "command" in node_path: + print(f"HERE 2 node_name: {node_name}") + print(f"node_path: {node_path}") + print(f"node_value: {node_value}") + else: + # Convert read-only list to a string for display + yield (node_path, infer_metadata(str(node_value), node_path)) + else: + # TODO: This won't be needed when all parameters provide metadata + yield (node_path, infer_metadata(node_value, node_path)) def infer_metadata(parameter: Any, uri: list[str]):