Skip to content

Commit

Permalink
feat: ComAux ability to clear receiving buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien THOMAS committed Jul 11, 2022
1 parent 9a36fae commit 04b9f67
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 9 deletions.
15 changes: 15 additions & 0 deletions docs/whats_new/version_ongoing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ DUT Auxiliary adaption
refactor/redesign of the device under test auxiliary to fit with the brand new double
threaded auxiliary interface


Record Auxiliary adaption
^^^^^^^^^^^^^^^^^^^^^^^^^
adapt the record auxiliary to fit with the brand new double threaded auxiliary interface

Communication Auxiliary
^^^^^^^^^^^^^^^^^^^^^^^
Communication auxiliary does not record message automatically anymore,
context manager has to be used on the send_message API of communication
auxiliary. It will then set an event that allow recording, and clear it when
getting out of the context manager.

See :ref:`examples/templates/suite_com/test_com.py`

Communication auxiliary now offers the possibility to clear old message
from the receiving buffer, before getting new one.

See :ref:`communication_auxiliary`
34 changes: 32 additions & 2 deletions examples/templates/suite_com/test_com.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,55 @@ def setUp(self):
def test_run(self):
"""Thanks to the usage of dev cc_raw_loopback, let's try to send
a message and receive it.
#poetry run pykiso -c examples/templates/com_aux.yaml
"""
logging.info(
f"--------------- RUN: {self.test_suite_id}, {self.test_case_id} ---------------"
)
# send 20 requests over the connected channel and check if the
# command was successfully sent

# send 20 requests (without context manager) over the connected channel
# and check if the command was successfully sent
for _ in range(20):
req = b"\x02\x04\x06"
logging.info(f"send request {req} over {com_aux.name}")
state = com_aux.send_message(req)
logging.info(f"request excecution state: {state}")
self.assertEqual(state, True)

# check messages have not been queued in as
# context manager wasn't used
self.assertTrue(com_aux.queue_out.empty())

# send 20 requests (with the context manager over the connected channel
# and check if the command was successfully sent
with com_aux.collect_messages():
for _ in range(20):
req = b"\x02\x04\x06"
logging.info(f"send request {req} over {com_aux.name}")
state = com_aux.send_message(req)
logging.info(f"request excecution state: {state}")
self.assertEqual(state, True)

# check messages have been queued in as
# context manager was used
self.assertFalse(com_aux.queue_out.empty())

# get the 20 first received messages
for _ in range(20):
response = com_aux.receive_message()
logging.info(f"received data {response}")
self.assertEqual(response, b"\x02\x04\x06")

# sending one request just to make sure queue is not empty
with com_aux.collect_messages():
req = b"\x02\x04\x06"
com_aux.send_message(req)

# check queue clearing process
self.assertFalse(com_aux.queue_out.empty())
com_aux.clear_buffer()
self.assertTrue(com_aux.queue_out.empty())

def tearDown(self):
"""If a fixture is not use just override it like below."""
logging.info(
Expand Down
55 changes: 52 additions & 3 deletions src/pykiso/lib/auxiliaries/communication_auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
"""
from __future__ import annotations

import functools
import logging
import queue
import threading
from contextlib import ContextDecorator
from typing import Any, Optional

from pykiso import CChannel, Message
Expand All @@ -34,6 +39,28 @@
log = logging.getLogger(__name__)


class _collect_messages(ContextDecorator):
"""Context manager and decorator for the communication auxiliary
allowing messages collection (putting them in the queue).
"""

def __init__(self, com_aux: CommunicationAuxiliary):
"""Constructor used to inherit some of communication auxiliary
features.
"""
self.com_aux = com_aux

def __enter__(self):
"""Set the queue event to allow messages collection."""
log.info("Start queuing received messages.")
self.com_aux.queueing_event.set()

def __exit__(self, *exc):
"""Clear queue event to stop messages collection."""
log.info("Stop queuing received messages.")
self.com_aux.queueing_event.clear()


class CommunicationAuxiliary(DTAuxiliaryInterface):
"""Auxiliary used to send raw bytes via a connector instead of pykiso.Messages."""

Expand All @@ -47,6 +74,8 @@ def __init__(self, com: CChannel, **kwargs: dict) -> None:
)
self.channel = com
self.queue_tx = queue.Queue()
self.queueing_event = threading.Event()
self.collect_messages = functools.partial(_collect_messages, com_aux=self)

@open_connector
def _create_auxiliary_instance(self) -> bool:
Expand Down Expand Up @@ -114,19 +143,32 @@ def run_command(
return state

def receive_message(
self, blocking: bool = True, timeout_in_s: float = None
self,
blocking: bool = True,
timeout_in_s: float = None,
clear_buffer: bool = False,
) -> Optional[bytes]:
"""Receive a raw message.
:param blocking: wait for message till timeout elapses?
:param timeout_in_s: maximum time in second to wait for a response
:param clear_buffer: clear all previously received messages that were
not fetched by this method
:returns: raw message
"""
if self.queue_out.empty():
log.error("Context manager not used. No messages in queue.")
return None

log.debug(
f"retrieving message in {self} (blocking={blocking}, timeout={timeout_in_s})"
)
if clear_buffer:
self.clear_buffer()

response = self.wait_for_queue_out(blocking=blocking, timeout_in_s=timeout_in_s)

log.debug(f"retrieved message '{response}' in {self}")

# if queue.Empty exception is raised None is returned so just
Expand All @@ -142,6 +184,12 @@ def receive_message(
return (msg, remote_id)
return msg

def clear_buffer(self) -> None:
"""Clear buffer from old stacked objects"""
log.info("Clearing buffer. Previous responses will be deleted.")
with self.queue_out.mutex:
self.queue_out.queue.clear()

def _run_command(self, cmd_message: str, cmd_data: bytes = None) -> bool:
"""Run the corresponding command.
Expand All @@ -167,7 +215,8 @@ def _run_command(self, cmd_message: str, cmd_data: bytes = None) -> bool:
self.queue_tx.put(state)

def _receive_message(self, timeout_in_s: float) -> bytes:
"""Get a message from the associated channel.
"""Get a message from the associated channel. And put the message in
the queue, if threading event is set.
:param timeout_in_s: maximum amount of time (seconds) to wait
for a message
Expand All @@ -177,7 +226,7 @@ def _receive_message(self, timeout_in_s: float) -> bytes:
try:
rcv_data = self.channel.cc_receive(timeout=timeout_in_s, raw=True)
msg = rcv_data.get("msg")
if msg is not None:
if msg is not None and self.queueing_event.is_set():
log.debug(f"received message '{rcv_data}' from {self.channel}")
self.queue_out.put(rcv_data)
except Exception:
Expand Down
17 changes: 15 additions & 2 deletions src/pykiso/lib/robot_framework/communication_auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,29 @@ def send_message(
logger.info(f"send message {raw_msg} using {aux_alias}")
return state

@keyword(name="Clear Buffer")
def clear_buffer(self, aux_alias: str) -> None:
"""Clear buffer from old stacked objects"""

aux = self._get_aux(aux_alias)
aux.clear_buffer()

@keyword(name="Receive message")
def receive_message(
self, aux_alias: str, blocking: bool = True, timeout_in_s: float = None
self,
aux_alias: str,
blocking: bool = True,
timeout_in_s: float = None,
clear_old: bool = True,
) -> Union[list, Tuple[list, int]]:
"""Return a raw received message from the queue.
:param aux_alias: auxiliary's alias
:param blocking: wait for message till timeout elapses?
:param timeout_in_s: maximum time in second to wait for a
response
:param clear_old: clear all previously received messages that were
not fetched by this method
:returns: raw message and source (return type could be different
depending on the associated channel)
Expand All @@ -78,7 +91,7 @@ def receive_message(
aux = self._get_aux(aux_alias)
source = None
msg = []
recv_msg = aux.receive_message(blocking, timeout_in_s)
recv_msg = aux.receive_message(blocking, timeout_in_s, clear_old)
try:
if recv_msg is not None:
msg, source = recv_msg
Expand Down
32 changes: 30 additions & 2 deletions tests/test_com_aux.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ def test_com_aux_messaging(com_aux_linker, caplog):
from pykiso.auxiliaries import com_aux

msg = b"test"
assert com_aux.send_message(msg)

with com_aux.collect_messages():
assert com_aux.send_message(msg)

with caplog.at_level(logging.DEBUG):
rec_msg = com_aux.receive_message()
assert rec_msg == msg
Expand All @@ -67,6 +70,17 @@ def test_com_aux_messaging(com_aux_linker, caplog):
)


def test_com_aux_messaging_without_cm(com_aux_linker, caplog):
from pykiso.auxiliaries import com_aux

msg = b"test"
com_aux.queue_tx.put(True)
com_aux.send_message(msg)
rec_msg = com_aux.receive_message()

assert rec_msg is None


def test_create_auxiliary_instance(com_aux_inst):
state = com_aux_inst._create_auxiliary_instance()

Expand Down Expand Up @@ -133,7 +147,7 @@ def test__run_command_unknown_command(com_aux_inst):
def test_receive_message_with_remote_id(mocker, com_aux_inst):
ret = {"msg": b"\x01", "remote_id": 0x123}

mocker.patch.object(com_aux_inst, "wait_for_queue_out", return_value=ret)
com_aux_inst.queue_out.put(ret)
recv = com_aux_inst.receive_message()

assert recv == (ret["msg"], ret["remote_id"])
Expand All @@ -146,6 +160,20 @@ def test_receive_message_none(mocker, com_aux_inst):
assert recv is None


@pytest.mark.parametrize("clear_buffer", [True, False])
def test_receive_message_clear_buffer(mocker, com_aux_inst, clear_buffer):
com_aux_inst.queue_out = queue.Queue()
com_aux_inst.queue_out.put({"msg": b"\x01", "remote_id": 0x123})
mocker.patch.object(com_aux_inst, "wait_for_queue_out")

com_aux_inst.receive_message(clear_buffer=clear_buffer)

if clear_buffer:
assert com_aux_inst.queue_out.empty()
else:
assert not com_aux_inst.queue_out.empty()


def test__receive_message_exception(mocker, com_aux_inst):
mocker.patch.object(com_aux_inst.channel, "cc_receive", side_effect=ValueError)

Expand Down
8 changes: 8 additions & 0 deletions tests/test_robot_com_auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ def test_send_message(mocker, communication_aux_instance):
send_msg_mock.assert_called_once()


def test_clear_buffer(mocker, communication_aux_instance):
clear_buffer_mock = mocker.patch.object(ComAux, "clear_buffer")

communication_aux_instance.clear_buffer("itf_com_aux")

clear_buffer_mock.assert_called_once()


def test_get_message_without_source(mocker, communication_aux_instance):
receive_msg_mock = mocker.patch.object(
ComAux, "receive_message", return_value=b"\x01\x02\x03"
Expand Down

0 comments on commit 04b9f67

Please sign in to comment.