Skip to content

Commit

Permalink
Initial work to get attribute subscription for chip-repl yamltest run…
Browse files Browse the repository at this point in the history
…ner (project-chip#24242)

* Initial work to get attribute subscrition for chip-repl yamltest runner

* Address PR comments

* Restyle

* Restyle

* Restyle
  • Loading branch information
tehampson authored and David Lechner committed Mar 22, 2023
1 parent 0028700 commit d5ee884
Showing 1 changed file with 166 additions and 3 deletions.
169 changes: 166 additions & 3 deletions src/controller/python/chip/yaml/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

import asyncio as asyncio
import logging
import queue
from abc import ABC, abstractmethod
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum

import chip.interaction_model
import chip.yaml.format_converter as Converter
import stringcase
from chip import ChipDeviceCtrl
from chip.clusters.Attribute import AttributeStatus, ValueDecodeFailure
from chip.clusters.Attribute import AttributeStatus, SubscriptionTransaction, TypedAttributePath, ValueDecodeFailure
from chip.yaml.errors import ParsingError, UnexpectedParsingError

from .data_model_lookup import *
Expand All @@ -44,11 +45,24 @@ class _ActionResult:
response: object


@dataclass
class _AttributeSubscriptionCallbackResult:
name: str
attribute_path: TypedAttributePath
result: _ActionResult


@dataclass
class _ExecutionContext:
''' Objects that is commonly passed around this file that are vital to test execution.'''
# Data model lookup to get python attribute, cluster, command object.
data_model_lookup: DataModelLookup = None
# List of subscriptions.
subscriptions: list = field(default_factory=list)
# The key is the attribute/event name, and the value is a queue of subscription callback results
# that been sent by device under test. For attribute subscription the queue is of type
# _AttributeSubscriptionCallbackResult.
subscription_callback_result_queue: dict = field(default_factory=dict)


class BaseAction(ABC):
Expand Down Expand Up @@ -175,6 +189,9 @@ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)

return self.parse_raw_response(raw_resp)

def parse_raw_response(self, raw_resp) -> _ActionResult:
if self._possibly_unsupported and not raw_resp:
# We have found an unsupported attribute. TestStep provided did specify that it might be
# unsupported, so nothing left to validate. We just return a failure here.
Expand All @@ -194,6 +211,83 @@ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
return _ActionResult(status=_ActionStatus.SUCCESS, response=return_val)


class AttributeChangeAccumulator:
def __init__(self, name: str, expected_attribute: Clusters.ClusterAttributeDescriptor,
output_queue: queue.SimpleQueue):
self._name = name
self._expected_attribute = expected_attribute
self._output_queue = output_queue

def __call__(self, path: TypedAttributePath, transaction: SubscriptionTransaction):
if path.AttributeType == self._expected_attribute:
data = transaction.GetAttribute(path)
result = _ActionResult(status=_ActionStatus.SUCCESS, response=path.AttributeType(data))

item = _AttributeSubscriptionCallbackResult(self._name, path, result)
logging.debug(
f'Got subscription report on client {self.name} for {path.AttributeType}: {data}')
self._output_queue.put(item)

@property
def name(self) -> str:
return self._name


class SubscribeAttributeAction(ReadAttributeAction):
'''Single subscribe attribute action to be executed.'''

def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to subscribe attribute action that can execute with ChipDeviceCtrl.
Args:
'test_step': Step containing information required to run write attribute action.
'cluster': Name of cluster write attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this write attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step, cluster, context)
self._context = context
if test_step.min_interval is None:
raise UnexpectedParsingError(
f'SubscribeAttribute action does not have min_interval {self.label}')
self._min_interval = test_step.min_interval

if test_step.max_interval is None:
raise UnexpectedParsingError(
f'SubscribeAttribute action does not have max_interval {self.label}')
self._max_interval = test_step.max_interval

def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
try:
subscription = asyncio.run(
dev_ctrl.ReadAttribute(self._node_id, [(self._endpoint, self._request_object)],
reportInterval=(self._min_interval, self._max_interval),
keepSubscriptions=False))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)

self._context.subscriptions.append(subscription)
output_queue = self._context.subscription_callback_result_queue.get(self._attribute_name,
None)
if output_queue is None:
output_queue = queue.SimpleQueue()
self._context.subscription_callback_result_queue[self._attribute_name] = output_queue

while not output_queue.empty():
output_queue.get(block=False)

subscription_handler = AttributeChangeAccumulator(self.label, self._request_object,
output_queue)

subscription.SetAttributeUpdateCallback(subscription_handler)

raw_resp = subscription.GetAttributes()
return self.parse_raw_response(raw_resp)


class WriteAttributeAction(BaseAction):
'''Single write attribute action to be executed.'''

Expand Down Expand Up @@ -258,6 +352,37 @@ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
return _ActionResult(status=_ActionStatus.ERROR, response=None)


class WaitForReportAction(BaseAction):
'''Single WaitForReport action to be executed.'''

def __init__(self, test_step, context: _ExecutionContext):
'''Converts 'test_step' to wait for report action.
Args:
'test_step': Step containing information required to run wait for report action.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
UnexpectedParsingError: Raised if the expected queue does not exist.
'''
super().__init__(test_step.label)
self._attribute_name = stringcase.pascalcase(test_step.attribute)
self._output_queue = context.subscription_callback_result_queue.get(self._attribute_name,
None)
if self._output_queue is None:
raise UnexpectedParsingError(f'Could not find output queue')

def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
try:
# While there should be a timeout here provided by the test, the current codegen version
# of YAML tests doesn't have a per test step timeout, only a global timeout for the
# entire test. For that reason we default to a 30 second timeout.
item = self._output_queue.get(block=True, timeout=30)
except queue.Empty:
return _ActionResult(status=_ActionStatus.ERROR, response=None)

return item.result


class ReplTestRunner:
'''Test runner to encode/decode values from YAML test Parser for executing the TestStep.
Expand Down Expand Up @@ -301,6 +426,25 @@ def _attribute_read_action_factory(self, test_step, cluster: str):
except ParsingError:
return None

def _attribute_subscribe_action_factory(self, test_step, cluster: str):
'''Creates subscribe attribute command from TestStep provided.
Args:
'test_step': Step containing information required to run subscribe attribute action.
'cluster': Name of cluster write attribute action is targeting.
Returns:
SubscribeAttributeAction if 'test_step' is a valid subscribe attribute to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
try:
return SubscribeAttributeAction(test_step, cluster, self._context)
except ParsingError:
# TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this
# runner has matched parity of the codegen YAML test, this exception should be
# propogated.
return None

def _attribute_write_action_factory(self, test_step, cluster: str):
'''Creates write attribute command TestStep.
Expand All @@ -317,6 +461,15 @@ def _attribute_write_action_factory(self, test_step, cluster: str):
except ParsingError:
return None

def _wait_for_report_action_factory(self, test_step):
try:
return WaitForReportAction(test_step, self._context)
except ParsingError:
# TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this
# runner has matched parity of the codegen YAML test, this exception should be
# propogated.
return None

def encode(self, request) -> BaseAction:
action = None
cluster = request.cluster.replace(' ', '').replace('/', '')
Expand All @@ -328,7 +481,13 @@ def encode(self, request) -> BaseAction:
elif command == 'readAttribute':
action = self._attribute_read_action_factory(request, cluster)
elif command == 'readEvent':
action = self._event_read_action_factory(request, cluster)
# TODO need to implement _event_read_action_factory
# action = self._event_read_action_factory(request, cluster)
pass
elif command == 'subscribeAttribute':
action = self._attribute_subscribe_action_factory(request, cluster)
elif command == 'waitForReport':
action = self._wait_for_report_action_factory(request)
else:
action = self._invoke_action_factory(request, cluster)

Expand Down Expand Up @@ -386,3 +545,7 @@ def decode(self, result: _ActionResult):

def execute(self, action: BaseAction):
return action.run_action(self._dev_ctrl)

def shutdown(self):
for subscription in self._context.subscriptions:
subscription.Shutdown()

0 comments on commit d5ee884

Please sign in to comment.