From 04b9f675ddc114c5ea6606b70223ec2e32e4b42f Mon Sep 17 00:00:00 2001 From: Julien THOMAS Date: Wed, 6 Jul 2022 09:47:40 +0200 Subject: [PATCH] feat: ComAux ability to clear receiving buffer --- docs/whats_new/version_ongoing.rst | 15 +++++ examples/templates/suite_com/test_com.py | 34 +++++++++++- .../auxiliaries/communication_auxiliary.py | 55 ++++++++++++++++++- .../communication_auxiliary.py | 17 +++++- tests/test_com_aux.py | 32 ++++++++++- tests/test_robot_com_auxiliary.py | 8 +++ 6 files changed, 152 insertions(+), 9 deletions(-) diff --git a/docs/whats_new/version_ongoing.rst b/docs/whats_new/version_ongoing.rst index 1923e3a2d..af94ec598 100644 --- a/docs/whats_new/version_ongoing.rst +++ b/docs/whats_new/version_ongoing.rst @@ -36,6 +36,21 @@ DUT Auxiliary adaption refactor/redesign of the device under test auxiliary to fit with the brand new double threaded auxiliary interface + Record Auxiliary adaption ^^^^^^^^^^^^^^^^^^^^^^^^^ adapt the record auxiliary to fit with the brand new double threaded auxiliary interface + +Communication Auxiliary +^^^^^^^^^^^^^^^^^^^^^^^ +Communication auxiliary does not record message automatically anymore, +context manager has to be used on the send_message API of communication +auxiliary. It will then set an event that allow recording, and clear it when +getting out of the context manager. + +See :ref:`examples/templates/suite_com/test_com.py` + +Communication auxiliary now offers the possibility to clear old message +from the receiving buffer, before getting new one. + +See :ref:`communication_auxiliary` diff --git a/examples/templates/suite_com/test_com.py b/examples/templates/suite_com/test_com.py index 6639abc54..7c50cc4f3 100644 --- a/examples/templates/suite_com/test_com.py +++ b/examples/templates/suite_com/test_com.py @@ -55,12 +55,14 @@ def setUp(self): def test_run(self): """Thanks to the usage of dev cc_raw_loopback, let's try to send a message and receive it. + #poetry run pykiso -c examples/templates/com_aux.yaml """ logging.info( f"--------------- RUN: {self.test_suite_id}, {self.test_case_id} ---------------" ) - # send 20 requests over the connected channel and check if the - # command was successfully sent + + # send 20 requests (without context manager) over the connected channel + # and check if the command was successfully sent for _ in range(20): req = b"\x02\x04\x06" logging.info(f"send request {req} over {com_aux.name}") @@ -68,12 +70,40 @@ def test_run(self): logging.info(f"request excecution state: {state}") self.assertEqual(state, True) + # check messages have not been queued in as + # context manager wasn't used + self.assertTrue(com_aux.queue_out.empty()) + + # send 20 requests (with the context manager over the connected channel + # and check if the command was successfully sent + with com_aux.collect_messages(): + for _ in range(20): + req = b"\x02\x04\x06" + logging.info(f"send request {req} over {com_aux.name}") + state = com_aux.send_message(req) + logging.info(f"request excecution state: {state}") + self.assertEqual(state, True) + + # check messages have been queued in as + # context manager was used + self.assertFalse(com_aux.queue_out.empty()) + # get the 20 first received messages for _ in range(20): response = com_aux.receive_message() logging.info(f"received data {response}") self.assertEqual(response, b"\x02\x04\x06") + # sending one request just to make sure queue is not empty + with com_aux.collect_messages(): + req = b"\x02\x04\x06" + com_aux.send_message(req) + + # check queue clearing process + self.assertFalse(com_aux.queue_out.empty()) + com_aux.clear_buffer() + self.assertTrue(com_aux.queue_out.empty()) + def tearDown(self): """If a fixture is not use just override it like below.""" logging.info( diff --git a/src/pykiso/lib/auxiliaries/communication_auxiliary.py b/src/pykiso/lib/auxiliaries/communication_auxiliary.py index de06c4e9c..b08100ecf 100644 --- a/src/pykiso/lib/auxiliaries/communication_auxiliary.py +++ b/src/pykiso/lib/auxiliaries/communication_auxiliary.py @@ -20,8 +20,13 @@ """ +from __future__ import annotations + +import functools import logging import queue +import threading +from contextlib import ContextDecorator from typing import Any, Optional from pykiso import CChannel, Message @@ -34,6 +39,28 @@ log = logging.getLogger(__name__) +class _collect_messages(ContextDecorator): + """Context manager and decorator for the communication auxiliary + allowing messages collection (putting them in the queue). + """ + + def __init__(self, com_aux: CommunicationAuxiliary): + """Constructor used to inherit some of communication auxiliary + features. + """ + self.com_aux = com_aux + + def __enter__(self): + """Set the queue event to allow messages collection.""" + log.info("Start queuing received messages.") + self.com_aux.queueing_event.set() + + def __exit__(self, *exc): + """Clear queue event to stop messages collection.""" + log.info("Stop queuing received messages.") + self.com_aux.queueing_event.clear() + + class CommunicationAuxiliary(DTAuxiliaryInterface): """Auxiliary used to send raw bytes via a connector instead of pykiso.Messages.""" @@ -47,6 +74,8 @@ def __init__(self, com: CChannel, **kwargs: dict) -> None: ) self.channel = com self.queue_tx = queue.Queue() + self.queueing_event = threading.Event() + self.collect_messages = functools.partial(_collect_messages, com_aux=self) @open_connector def _create_auxiliary_instance(self) -> bool: @@ -114,19 +143,32 @@ def run_command( return state def receive_message( - self, blocking: bool = True, timeout_in_s: float = None + self, + blocking: bool = True, + timeout_in_s: float = None, + clear_buffer: bool = False, ) -> Optional[bytes]: """Receive a raw message. :param blocking: wait for message till timeout elapses? :param timeout_in_s: maximum time in second to wait for a response + :param clear_buffer: clear all previously received messages that were + not fetched by this method :returns: raw message """ + if self.queue_out.empty(): + log.error("Context manager not used. No messages in queue.") + return None + log.debug( f"retrieving message in {self} (blocking={blocking}, timeout={timeout_in_s})" ) + if clear_buffer: + self.clear_buffer() + response = self.wait_for_queue_out(blocking=blocking, timeout_in_s=timeout_in_s) + log.debug(f"retrieved message '{response}' in {self}") # if queue.Empty exception is raised None is returned so just @@ -142,6 +184,12 @@ def receive_message( return (msg, remote_id) return msg + def clear_buffer(self) -> None: + """Clear buffer from old stacked objects""" + log.info("Clearing buffer. Previous responses will be deleted.") + with self.queue_out.mutex: + self.queue_out.queue.clear() + def _run_command(self, cmd_message: str, cmd_data: bytes = None) -> bool: """Run the corresponding command. @@ -167,7 +215,8 @@ def _run_command(self, cmd_message: str, cmd_data: bytes = None) -> bool: self.queue_tx.put(state) def _receive_message(self, timeout_in_s: float) -> bytes: - """Get a message from the associated channel. + """Get a message from the associated channel. And put the message in + the queue, if threading event is set. :param timeout_in_s: maximum amount of time (seconds) to wait for a message @@ -177,7 +226,7 @@ def _receive_message(self, timeout_in_s: float) -> bytes: try: rcv_data = self.channel.cc_receive(timeout=timeout_in_s, raw=True) msg = rcv_data.get("msg") - if msg is not None: + if msg is not None and self.queueing_event.is_set(): log.debug(f"received message '{rcv_data}' from {self.channel}") self.queue_out.put(rcv_data) except Exception: diff --git a/src/pykiso/lib/robot_framework/communication_auxiliary.py b/src/pykiso/lib/robot_framework/communication_auxiliary.py index 36668ecc0..4596b4f06 100644 --- a/src/pykiso/lib/robot_framework/communication_auxiliary.py +++ b/src/pykiso/lib/robot_framework/communication_auxiliary.py @@ -60,9 +60,20 @@ def send_message( logger.info(f"send message {raw_msg} using {aux_alias}") return state + @keyword(name="Clear Buffer") + def clear_buffer(self, aux_alias: str) -> None: + """Clear buffer from old stacked objects""" + + aux = self._get_aux(aux_alias) + aux.clear_buffer() + @keyword(name="Receive message") def receive_message( - self, aux_alias: str, blocking: bool = True, timeout_in_s: float = None + self, + aux_alias: str, + blocking: bool = True, + timeout_in_s: float = None, + clear_old: bool = True, ) -> Union[list, Tuple[list, int]]: """Return a raw received message from the queue. @@ -70,6 +81,8 @@ def receive_message( :param blocking: wait for message till timeout elapses? :param timeout_in_s: maximum time in second to wait for a response + :param clear_old: clear all previously received messages that were + not fetched by this method :returns: raw message and source (return type could be different depending on the associated channel) @@ -78,7 +91,7 @@ def receive_message( aux = self._get_aux(aux_alias) source = None msg = [] - recv_msg = aux.receive_message(blocking, timeout_in_s) + recv_msg = aux.receive_message(blocking, timeout_in_s, clear_old) try: if recv_msg is not None: msg, source = recv_msg diff --git a/tests/test_com_aux.py b/tests/test_com_aux.py index b30c24cd6..9b196b8e1 100644 --- a/tests/test_com_aux.py +++ b/tests/test_com_aux.py @@ -56,7 +56,10 @@ def test_com_aux_messaging(com_aux_linker, caplog): from pykiso.auxiliaries import com_aux msg = b"test" - assert com_aux.send_message(msg) + + with com_aux.collect_messages(): + assert com_aux.send_message(msg) + with caplog.at_level(logging.DEBUG): rec_msg = com_aux.receive_message() assert rec_msg == msg @@ -67,6 +70,17 @@ def test_com_aux_messaging(com_aux_linker, caplog): ) +def test_com_aux_messaging_without_cm(com_aux_linker, caplog): + from pykiso.auxiliaries import com_aux + + msg = b"test" + com_aux.queue_tx.put(True) + com_aux.send_message(msg) + rec_msg = com_aux.receive_message() + + assert rec_msg is None + + def test_create_auxiliary_instance(com_aux_inst): state = com_aux_inst._create_auxiliary_instance() @@ -133,7 +147,7 @@ def test__run_command_unknown_command(com_aux_inst): def test_receive_message_with_remote_id(mocker, com_aux_inst): ret = {"msg": b"\x01", "remote_id": 0x123} - mocker.patch.object(com_aux_inst, "wait_for_queue_out", return_value=ret) + com_aux_inst.queue_out.put(ret) recv = com_aux_inst.receive_message() assert recv == (ret["msg"], ret["remote_id"]) @@ -146,6 +160,20 @@ def test_receive_message_none(mocker, com_aux_inst): assert recv is None +@pytest.mark.parametrize("clear_buffer", [True, False]) +def test_receive_message_clear_buffer(mocker, com_aux_inst, clear_buffer): + com_aux_inst.queue_out = queue.Queue() + com_aux_inst.queue_out.put({"msg": b"\x01", "remote_id": 0x123}) + mocker.patch.object(com_aux_inst, "wait_for_queue_out") + + com_aux_inst.receive_message(clear_buffer=clear_buffer) + + if clear_buffer: + assert com_aux_inst.queue_out.empty() + else: + assert not com_aux_inst.queue_out.empty() + + def test__receive_message_exception(mocker, com_aux_inst): mocker.patch.object(com_aux_inst.channel, "cc_receive", side_effect=ValueError) diff --git a/tests/test_robot_com_auxiliary.py b/tests/test_robot_com_auxiliary.py index 611fca191..bda027215 100644 --- a/tests/test_robot_com_auxiliary.py +++ b/tests/test_robot_com_auxiliary.py @@ -36,6 +36,14 @@ def test_send_message(mocker, communication_aux_instance): send_msg_mock.assert_called_once() +def test_clear_buffer(mocker, communication_aux_instance): + clear_buffer_mock = mocker.patch.object(ComAux, "clear_buffer") + + communication_aux_instance.clear_buffer("itf_com_aux") + + clear_buffer_mock.assert_called_once() + + def test_get_message_without_source(mocker, communication_aux_instance): receive_msg_mock = mocker.patch.object( ComAux, "receive_message", return_value=b"\x01\x02\x03"