Skip to content

Commit

Permalink
feat: ComAux ability to clear receiving buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien THOMAS committed Jul 14, 2022
1 parent 59c312d commit a16503f
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 11 deletions.
14 changes: 13 additions & 1 deletion docs/whats_new/version_ongoing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,16 @@ Incompatibilities with the agnostic proxy are now resolved. You should be able t

RTT connector log folder creation
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RTT connector now creates a log folder if it does not exist instead of throwing an error
RTT connector now creates a log folder if it does not exist instead of throwing an error

Communication Auxiliary
^^^^^^^^^^^^^^^^^^^^^^^
To save on memory, the communication auxiliary does not record messages automatically anymore,
The functionality is now available with a`with comaux.collect_messages()`.

See :ref:`examples/templates/suite_com/test_com.py`

Communication auxiliary now offers the possibility to clear queue
to ensure a proper queue with API : :ref:`clear_buffer`

See :ref:`communication_auxiliary`
33 changes: 31 additions & 2 deletions examples/templates/suite_com/test_com.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,50 @@ def test_run(self):
logging.info(
f"--------------- RUN: {self.test_suite_id}, {self.test_case_id} ---------------"
)
# send 20 requests over the connected channel and check if the
# command was successfully sent

# send 20 requests (without context manager) over the connected channel
# and check if the command was successfully sent
for _ in range(20):
req = b"\x02\x04\x06"
logging.info(f"send request {req} over {com_aux.name}")
state = com_aux.send_message(req)
logging.info(f"request excecution state: {state}")
self.assertEqual(state, True)

# check messages have not been queued in as
# context manager wasn't used
self.assertTrue(com_aux.queue_out.empty())

# send 20 requests (with the context manager over the connected channel
# and check if the command was successfully sent
with com_aux.collect_messages():
for _ in range(20):
req = b"\x02\x04\x06"
logging.info(f"send request {req} over {com_aux.name}")
state = com_aux.send_message(req)
logging.info(f"request excecution state: {state}")
self.assertEqual(state, True)

# check messages have been queued in as
# context manager was used
self.assertFalse(com_aux.queue_out.empty())

# get the 20 first received messages
for _ in range(20):
response = com_aux.receive_message()
logging.info(f"received data {response}")
self.assertEqual(response, b"\x02\x04\x06")

# sending one request just to make sure queue is not empty
with com_aux.collect_messages():
req = b"\x02\x04\x06"
com_aux.send_message(req)

# check queue clearing process
self.assertFalse(com_aux.queue_out.empty())
com_aux.clear_buffer()
self.assertTrue(com_aux.queue_out.empty())

def tearDown(self):
"""If a fixture is not use just override it like below."""
logging.info(
Expand Down
50 changes: 47 additions & 3 deletions src/pykiso/lib/auxiliaries/communication_auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
"""
from __future__ import annotations

import functools
import logging
import queue
import threading
from contextlib import ContextDecorator
from typing import Any, Optional

from pykiso import CChannel, Message
Expand All @@ -34,6 +39,28 @@
log = logging.getLogger(__name__)


class _collect_messages(ContextDecorator):
"""Context manager and decorator for the communication auxiliary
allowing messages collection (putting them in the queue).
"""

def __init__(self, com_aux: CommunicationAuxiliary):
"""Constructor used to inherit some of communication auxiliary
features.
"""
self.com_aux = com_aux

def __enter__(self):
"""Set the queue event to allow messages collection."""
log.info("Start queuing received messages.")
self.com_aux.queueing_event.set()

def __exit__(self, *exc):
"""Clear queue event to stop messages collection."""
log.info("Stop queuing received messages.")
self.com_aux.queueing_event.clear()


class CommunicationAuxiliary(DTAuxiliaryInterface):
"""Auxiliary used to send raw bytes via a connector instead of pykiso.Messages."""

Expand All @@ -47,6 +74,8 @@ def __init__(self, com: CChannel, **kwargs: dict) -> None:
)
self.channel = com
self.queue_tx = queue.Queue()
self.queueing_event = threading.Event()
self.collect_messages = functools.partial(_collect_messages, com_aux=self)

@open_connector
def _create_auxiliary_instance(self) -> bool:
Expand Down Expand Up @@ -114,7 +143,9 @@ def run_command(
return state

def receive_message(
self, blocking: bool = True, timeout_in_s: float = None
self,
blocking: bool = True,
timeout_in_s: float = None,
) -> Optional[bytes]:
"""Receive a raw message.
Expand All @@ -123,10 +154,16 @@ def receive_message(
:returns: raw message
"""
if self.queue_out.empty():
log.error("Context manager not used. No messages in queue.")
return None

log.debug(
f"retrieving message in {self} (blocking={blocking}, timeout={timeout_in_s})"
)

response = self.wait_for_queue_out(blocking=blocking, timeout_in_s=timeout_in_s)

log.debug(f"retrieved message '{response}' in {self}")

# if queue.Empty exception is raised None is returned so just
Expand All @@ -142,6 +179,12 @@ def receive_message(
return (msg, remote_id)
return msg

def clear_buffer(self) -> None:
"""Clear buffer from old stacked objects"""
log.info("Clearing buffer. Previous responses will be deleted.")
with self.queue_out.mutex:
self.queue_out.queue.clear()

def _run_command(self, cmd_message: str, cmd_data: bytes = None) -> bool:
"""Run the corresponding command.
Expand All @@ -167,7 +210,8 @@ def _run_command(self, cmd_message: str, cmd_data: bytes = None) -> bool:
self.queue_tx.put(state)

def _receive_message(self, timeout_in_s: float) -> bytes:
"""Get a message from the associated channel.
"""Get a message from the associated channel. And put the message in
the queue, if threading event is set.
:param timeout_in_s: maximum amount of time (seconds) to wait
for a message
Expand All @@ -177,7 +221,7 @@ def _receive_message(self, timeout_in_s: float) -> bytes:
try:
rcv_data = self.channel.cc_receive(timeout=timeout_in_s, raw=True)
msg = rcv_data.get("msg")
if msg is not None:
if msg is not None and self.queueing_event.is_set():
log.debug(f"received message '{rcv_data}' from {self.channel}")
self.queue_out.put(rcv_data)
except Exception:
Expand Down
2 changes: 1 addition & 1 deletion src/pykiso/lib/connectors/cc_socket_can/cc_socket_can.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import platform
import time
from pathlib import Path
from typing import Union, Dict
from typing import Dict, Union

import can
import can.bus
Expand Down
27 changes: 25 additions & 2 deletions src/pykiso/lib/robot_framework/communication_auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
.. currentmodule:: communication_auxiliary
"""

import threading
from typing import Tuple, Union

from robot.api import logger
Expand All @@ -40,6 +40,7 @@ class CommunicationAuxiliary(RobotAuxInterface):
def __init__(self):
"""Initialize attributes."""
super().__init__(aux_type=ComAux)
self.queueing_event = threading.Event()

@keyword(name="Send message")
def send_message(
Expand All @@ -60,9 +61,31 @@ def send_message(
logger.info(f"send message {raw_msg} using {aux_alias}")
return state

@keyword(name="Start Recording")
def start_recording(self, aux_alias: str) -> None:
"""Start recording com_aux messages"""

self.queueing_event.set()

@keyword(name="Stop Recording")
def stop_recording(self, aux_alias: str) -> None:
"""Stop recording com_aux messages"""

self.queueing_event.clear()

@keyword(name="Clear Buffer")
def clear_buffer(self, aux_alias: str) -> None:
"""Clear buffer from old stacked objects"""

aux = self._get_aux(aux_alias)
aux.clear_buffer()

@keyword(name="Receive message")
def receive_message(
self, aux_alias: str, blocking: bool = True, timeout_in_s: float = None
self,
aux_alias: str,
blocking: bool = True,
timeout_in_s: float = None,
) -> Union[list, Tuple[list, int]]:
"""Return a raw received message from the queue.
Expand Down
26 changes: 24 additions & 2 deletions tests/test_com_aux.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ def test_com_aux_messaging(com_aux_linker, caplog):
from pykiso.auxiliaries import com_aux

msg = b"test"
assert com_aux.send_message(msg)

with com_aux.collect_messages():
assert com_aux.send_message(msg)

with caplog.at_level(logging.DEBUG):
rec_msg = com_aux.receive_message()
assert rec_msg == msg
Expand All @@ -67,6 +70,17 @@ def test_com_aux_messaging(com_aux_linker, caplog):
)


def test_com_aux_messaging_without_cm(com_aux_linker, caplog):
from pykiso.auxiliaries import com_aux

msg = b"test"
com_aux.queue_tx.put(True)
com_aux.send_message(msg)
rec_msg = com_aux.receive_message()

assert rec_msg is None


def test_create_auxiliary_instance(com_aux_inst):
state = com_aux_inst._create_auxiliary_instance()

Expand Down Expand Up @@ -133,7 +147,7 @@ def test__run_command_unknown_command(com_aux_inst):
def test_receive_message_with_remote_id(mocker, com_aux_inst):
ret = {"msg": b"\x01", "remote_id": 0x123}

mocker.patch.object(com_aux_inst, "wait_for_queue_out", return_value=ret)
com_aux_inst.queue_out.put(ret)
recv = com_aux_inst.receive_message()

assert recv == (ret["msg"], ret["remote_id"])
Expand All @@ -153,3 +167,11 @@ def test__receive_message_exception(mocker, com_aux_inst):

with pytest.raises(queue.Empty):
com_aux_inst.queue_out.get_nowait()


def test_clear_buffer(com_aux_inst):
com_aux_inst.queue_out.put(b"test")

com_aux_inst.clear_buffer()

assert com_aux_inst.queue_out.empty()
18 changes: 18 additions & 0 deletions tests/test_robot_com_auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ def test_send_message(mocker, communication_aux_instance):
send_msg_mock.assert_called_once()


def test_clear_buffer(mocker, communication_aux_instance):
clear_buffer_mock = mocker.patch.object(ComAux, "clear_buffer")

communication_aux_instance.clear_buffer("itf_com_aux")

clear_buffer_mock.assert_called_once()


def test_start_stop_recording(communication_aux_instance):
communication_aux_instance.start_recording("itf_com_aux")
flag = communication_aux_instance.queueing_event.is_set()
assert flag is True

communication_aux_instance.stop_recording("itf_com_aux")
flag = communication_aux_instance.queueing_event.is_set()
assert flag is False


def test_get_message_without_source(mocker, communication_aux_instance):
receive_msg_mock = mocker.patch.object(
ComAux, "receive_message", return_value=b"\x01\x02\x03"
Expand Down

0 comments on commit a16503f

Please sign in to comment.