From 1456950fc3824c899412ebfb08428545442afc41 Mon Sep 17 00:00:00 2001 From: Terence Hampson Date: Tue, 10 Jan 2023 11:53:21 -0500 Subject: [PATCH] Initial work to get attribute subscription for chip-repl yamltest runner (#24242) * Initial work to get attribute subscrition for chip-repl yamltest runner * Address PR comments * Restyle * Restyle * Restyle --- src/controller/python/chip/yaml/runner.py | 169 +++++++++++++++++++++- 1 file changed, 166 insertions(+), 3 deletions(-) diff --git a/src/controller/python/chip/yaml/runner.py b/src/controller/python/chip/yaml/runner.py index c901c0ee748204..d5dde80ea8454f 100644 --- a/src/controller/python/chip/yaml/runner.py +++ b/src/controller/python/chip/yaml/runner.py @@ -17,15 +17,16 @@ import asyncio as asyncio import logging +import queue from abc import ABC, abstractmethod -from dataclasses import dataclass +from dataclasses import dataclass, field from enum import Enum import chip.interaction_model import chip.yaml.format_converter as Converter import stringcase from chip import ChipDeviceCtrl -from chip.clusters.Attribute import AttributeStatus, ValueDecodeFailure +from chip.clusters.Attribute import AttributeStatus, SubscriptionTransaction, TypedAttributePath, ValueDecodeFailure from chip.yaml.errors import ParsingError, UnexpectedParsingError from .data_model_lookup import * @@ -44,11 +45,24 @@ class _ActionResult: response: object +@dataclass +class _AttributeSubscriptionCallbackResult: + name: str + attribute_path: TypedAttributePath + result: _ActionResult + + @dataclass class _ExecutionContext: ''' Objects that is commonly passed around this file that are vital to test execution.''' # Data model lookup to get python attribute, cluster, command object. data_model_lookup: DataModelLookup = None + # List of subscriptions. + subscriptions: list = field(default_factory=list) + # The key is the attribute/event name, and the value is a queue of subscription callback results + # that been sent by device under test. For attribute subscription the queue is of type + # _AttributeSubscriptionCallbackResult. + subscription_callback_result_queue: dict = field(default_factory=dict) class BaseAction(ABC): @@ -175,6 +189,9 @@ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult: except chip.interaction_model.InteractionModelError as error: return _ActionResult(status=_ActionStatus.ERROR, response=error) + return self.parse_raw_response(raw_resp) + + def parse_raw_response(self, raw_resp) -> _ActionResult: if self._possibly_unsupported and not raw_resp: # We have found an unsupported attribute. TestStep provided did specify that it might be # unsupported, so nothing left to validate. We just return a failure here. @@ -194,6 +211,83 @@ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult: return _ActionResult(status=_ActionStatus.SUCCESS, response=return_val) +class AttributeChangeAccumulator: + def __init__(self, name: str, expected_attribute: Clusters.ClusterAttributeDescriptor, + output_queue: queue.SimpleQueue): + self._name = name + self._expected_attribute = expected_attribute + self._output_queue = output_queue + + def __call__(self, path: TypedAttributePath, transaction: SubscriptionTransaction): + if path.AttributeType == self._expected_attribute: + data = transaction.GetAttribute(path) + result = _ActionResult(status=_ActionStatus.SUCCESS, response=path.AttributeType(data)) + + item = _AttributeSubscriptionCallbackResult(self._name, path, result) + logging.debug( + f'Got subscription report on client {self.name} for {path.AttributeType}: {data}') + self._output_queue.put(item) + + @property + def name(self) -> str: + return self._name + + +class SubscribeAttributeAction(ReadAttributeAction): + '''Single subscribe attribute action to be executed.''' + + def __init__(self, test_step, cluster: str, context: _ExecutionContext): + '''Converts 'test_step' to subscribe attribute action that can execute with ChipDeviceCtrl. + + Args: + 'test_step': Step containing information required to run write attribute action. + 'cluster': Name of cluster write attribute action is targeting. + 'context': Contains test-wide common objects such as DataModelLookup instance. + Raises: + ParsingError: Raised if there is a benign error, and there is currently no + action to perform for this write attribute. + UnexpectedParsingError: Raised if there is an unexpected parsing error. + ''' + super().__init__(test_step, cluster, context) + self._context = context + if test_step.min_interval is None: + raise UnexpectedParsingError( + f'SubscribeAttribute action does not have min_interval {self.label}') + self._min_interval = test_step.min_interval + + if test_step.max_interval is None: + raise UnexpectedParsingError( + f'SubscribeAttribute action does not have max_interval {self.label}') + self._max_interval = test_step.max_interval + + def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult: + try: + subscription = asyncio.run( + dev_ctrl.ReadAttribute(self._node_id, [(self._endpoint, self._request_object)], + reportInterval=(self._min_interval, self._max_interval), + keepSubscriptions=False)) + except chip.interaction_model.InteractionModelError as error: + return _ActionResult(status=_ActionStatus.ERROR, response=error) + + self._context.subscriptions.append(subscription) + output_queue = self._context.subscription_callback_result_queue.get(self._attribute_name, + None) + if output_queue is None: + output_queue = queue.SimpleQueue() + self._context.subscription_callback_result_queue[self._attribute_name] = output_queue + + while not output_queue.empty(): + output_queue.get(block=False) + + subscription_handler = AttributeChangeAccumulator(self.label, self._request_object, + output_queue) + + subscription.SetAttributeUpdateCallback(subscription_handler) + + raw_resp = subscription.GetAttributes() + return self.parse_raw_response(raw_resp) + + class WriteAttributeAction(BaseAction): '''Single write attribute action to be executed.''' @@ -258,6 +352,37 @@ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult: return _ActionResult(status=_ActionStatus.ERROR, response=None) +class WaitForReportAction(BaseAction): + '''Single WaitForReport action to be executed.''' + + def __init__(self, test_step, context: _ExecutionContext): + '''Converts 'test_step' to wait for report action. + + Args: + 'test_step': Step containing information required to run wait for report action. + 'context': Contains test-wide common objects such as DataModelLookup instance. + Raises: + UnexpectedParsingError: Raised if the expected queue does not exist. + ''' + super().__init__(test_step.label) + self._attribute_name = stringcase.pascalcase(test_step.attribute) + self._output_queue = context.subscription_callback_result_queue.get(self._attribute_name, + None) + if self._output_queue is None: + raise UnexpectedParsingError(f'Could not find output queue') + + def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult: + try: + # While there should be a timeout here provided by the test, the current codegen version + # of YAML tests doesn't have a per test step timeout, only a global timeout for the + # entire test. For that reason we default to a 30 second timeout. + item = self._output_queue.get(block=True, timeout=30) + except queue.Empty: + return _ActionResult(status=_ActionStatus.ERROR, response=None) + + return item.result + + class ReplTestRunner: '''Test runner to encode/decode values from YAML test Parser for executing the TestStep. @@ -301,6 +426,25 @@ def _attribute_read_action_factory(self, test_step, cluster: str): except ParsingError: return None + def _attribute_subscribe_action_factory(self, test_step, cluster: str): + '''Creates subscribe attribute command from TestStep provided. + + Args: + 'test_step': Step containing information required to run subscribe attribute action. + 'cluster': Name of cluster write attribute action is targeting. + Returns: + SubscribeAttributeAction if 'test_step' is a valid subscribe attribute to be executed. + None if we were unable to use the provided 'test_step' for a known reason that is not + fatal to test execution. + ''' + try: + return SubscribeAttributeAction(test_step, cluster, self._context) + except ParsingError: + # TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this + # runner has matched parity of the codegen YAML test, this exception should be + # propogated. + return None + def _attribute_write_action_factory(self, test_step, cluster: str): '''Creates write attribute command TestStep. @@ -317,6 +461,15 @@ def _attribute_write_action_factory(self, test_step, cluster: str): except ParsingError: return None + def _wait_for_report_action_factory(self, test_step): + try: + return WaitForReportAction(test_step, self._context) + except ParsingError: + # TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this + # runner has matched parity of the codegen YAML test, this exception should be + # propogated. + return None + def encode(self, request) -> BaseAction: action = None cluster = request.cluster.replace(' ', '').replace('/', '') @@ -328,7 +481,13 @@ def encode(self, request) -> BaseAction: elif command == 'readAttribute': action = self._attribute_read_action_factory(request, cluster) elif command == 'readEvent': - action = self._event_read_action_factory(request, cluster) + # TODO need to implement _event_read_action_factory + # action = self._event_read_action_factory(request, cluster) + pass + elif command == 'subscribeAttribute': + action = self._attribute_subscribe_action_factory(request, cluster) + elif command == 'waitForReport': + action = self._wait_for_report_action_factory(request) else: action = self._invoke_action_factory(request, cluster) @@ -386,3 +545,7 @@ def decode(self, result: _ActionResult): def execute(self, action: BaseAction): return action.run_action(self._dev_ctrl) + + def shutdown(self): + for subscription in self._context.subscriptions: + subscription.Shutdown()