diff --git a/docs/advanced_usage/advanced_config_file.rst b/docs/advanced_usage/advanced_config_file.rst index 00041bf6..3c17ed19 100644 --- a/docs/advanced_usage/advanced_config_file.rst +++ b/docs/advanced_usage/advanced_config_file.rst @@ -131,6 +131,12 @@ that will have exclusive access to the communication channel. A the created :py:class:`~pykiso.lib.auxiliaries.proxy_auxiliary.ProxyAuxiliary`. This allows to keep a minimalistic :py:class:`~pykiso.connector.CChannel` implementation. +Access to attributes and methods of the defined communication channel that has been +attached to the created :py:class:`~pykiso.lib.auxiliaries.proxy_auxiliary.ProxyAuxiliary` +is still possible, but keep in mind that if one auxiliary modifies one the communication +channel's attributes, every other auxiliary sharing this communication channel will be +affected by this change. + An illustration of the resulting internal setup can be found at :ref:`proxy_aux`. In other words, if you define the following YAML configuration file: diff --git a/docs/advanced_usage/how_to_connector.rst b/docs/advanced_usage/how_to_connector.rst index fed72810..42630d60 100644 --- a/docs/advanced_usage/how_to_connector.rst +++ b/docs/advanced_usage/how_to_connector.rst @@ -35,11 +35,9 @@ This interface enforces the implementation of the following methods: - :py:meth:`~pykiso.connector.CChannel._cc_close`: close the communication. Does not take any argument. - :py:meth:`~pykiso.connector.CChannel._cc_send`: send data if the communication is open. - Requires one positional argument ``msg`` and one keyword argument ``raw``, used to serialize the data - before sending it. + Requires one positional argument ``msg`` . - :py:meth:`~pykiso.connector.CChannel._cc_receive`: receive data if the communication is open. - Requires one positional argument ``timeout`` and one keyword argument ``raw``, used to deserialize - the data when receiving it. + Requires one positional argument ``timeout`` . Class definition and instanciation @@ -113,19 +111,13 @@ The connector then becomes: def _cc_close(self): self.my_connection.close() - def _cc_send(self, data: Union[Data, bytes], raw = False): - if raw: - data_bytes = data - else: - data_bytes = data.serialize() + def _cc_send(self, data: bytes): self.my_connection.send(data_bytes) - def _cc_receive(self, timeout, raw = False): + def _cc_receive(self, timeout) -> Optional[bytes]: received_data = self.my_connection.receive(timeout=timeout) if received_data: - if not raw: - data = Data.deserialize(received_data) - return data + return received_data .. note:: The API used in this example for the fictive *my_connection* module @@ -146,15 +138,11 @@ see the example below with the cc_pcan_can connector and the return of the remot .. code:: python def _cc_receive( - self, timeout: float = 0.0001, raw: bool = False + self, timeout: float = 0.0001 ) -> Dict[str, Union[MessageType, int]]: """Receive a can message using configured filters. - If raw parameter is set to True return received message as it is (bytes) - otherwise test entity protocol format is used and Message class type is returned. - :param timeout: timeout applied on reception - :param raw: boolean use to select test entity protocol format :return: the received data and the source can id """ @@ -165,8 +153,6 @@ see the example below with the cc_pcan_can connector and the return of the remot frame_id = received_msg.arbitration_id payload = received_msg.data timestamp = received_msg.timestamp - if not raw: - payload = Message.parse_packet(payload) log.internal_debug(f"received CAN Message: {frame_id}, {payload}, {timestamp}") return {"msg": payload, "remote_id": frame_id} else: @@ -186,15 +172,12 @@ see example below with the cc_pcan_can connector and the additional remote_id pa .. code:: python - def _cc_send(self, msg: MessageType, raw: bool = False, **kwargs) -> None: + def _cc_send(self, msg: MessageType, **kwargs) -> None: """Send a CAN message at the configured id. - If remote_id parameter is not given take configured ones, in addition if - raw is set to True take the msg parameter as it is otherwise parse it using - test entity protocol format. + If remote_id parameter is not given take configured ones :param msg: data to send - :param raw: boolean use to select test entity protocol format :param kwargs: named arguments """ diff --git a/docs/whats_new/version_ongoing.rst b/docs/whats_new/version_ongoing.rst index c7d1aaf6..a9de5eac 100644 --- a/docs/whats_new/version_ongoing.rst +++ b/docs/whats_new/version_ongoing.rst @@ -16,3 +16,13 @@ Ykush Auxiliary Auxiliary that can be used to power on and off the ports of an Ykush USB Hub. See :ref:`ykush_auxiliary` + + +Internal creation of proxy auxiliaries +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +It is no longer necessary to manually defined a ``ProxyAuxiliary`` with +``CCProxy``s yourself. If you simply pass the communication channel to +each auxiliary that has to share it, ``pykiso`` will do the rest for you. + + diff --git a/examples/test_stepreport/stepreport_example.py b/examples/test_stepreport/stepreport_example.py index 3eb127ca..6eed2062 100644 --- a/examples/test_stepreport/stepreport_example.py +++ b/examples/test_stepreport/stepreport_example.py @@ -109,3 +109,61 @@ def test_run(self): self.assertAlmostEqual(voltage, 4, delta=1, msg="Check voltage device") logging.info(f"I HAVE RUN 0.1.1 for tag {self.tag}!") + + +@pykiso.define_test_parameters( + suite_id=1, + case_id=3, +) +class WrapLongTextTest(pykiso.BasicTest): + """This test shows wrapping of long results into foldable html elements""" + + def setUp(self): + """Set header information and check setup conditions""" + super().setUp() + # additional data to include in the step-report + self.step_report.header["Version_device"] = "2022-1234" + + def test_run(self): + """Write long results to the step report to show how foldable html + elements get used to make the report overview more readable + """ + logging.info( + f"--------------- RUN: {self.test_suite_id}, {self.test_case_id} ---------------" + ) + + # data to test + device_on = True + actual_dummy_result = {"result": True} + expected_dummy_result = { + "result": "pykiso is an integration test framework. With it, it is possible to write: Whitebox integration tests directly on my target device, Graybox integration tests to make sure the communication-link with my target device is working as expected, Blackbox integration tests to make sure my external device interfaces are working as expected", + } + + # check that it works with multiple tables + self.step_report.current_table = "First table" + + self.assertEqual( + expected_dummy_result, + expected_dummy_result, + msg="The very long message should be wrapped", + ) + + self.step_report.current_table = "Second table" + + with self.subTest("Non critical checks"): + # This check will fail but the test continues + self.assertFalse(device_on, msg="Some check") + + # assert with custom message + # assert msg overwritten when step_report_message not null + self.step_report.message = "Big data message" + + self.assertEqual( + expected_dummy_result, expected_dummy_result, msg="Check big data" + ) + + self.assertEqual( + expected_dummy_result, actual_dummy_result, msg="Check big data" + ) + + logging.info(f"I HAVE RUN 0.1.1 for tag {self.tag}!") diff --git a/src/pykiso/auxiliary.py b/src/pykiso/auxiliary.py index c9cbb0e1..d100b93d 100644 --- a/src/pykiso/auxiliary.py +++ b/src/pykiso/auxiliary.py @@ -26,9 +26,9 @@ import time from typing import Any -from .logging_initializer import add_logging_level -from .test_setup.config_registry import ConfigRegistry -from .types import MsgType +from pykiso.logging_initializer import add_internal_log_levels +from pykiso.test_setup.config_registry import ConfigRegistry +from pykiso.types import MsgType log = logging.getLogger(__name__) @@ -38,18 +38,9 @@ class AuxiliaryCommon(metaclass=abc.ABCMeta): multiprocessing and thread auxiliary interface. """ - def __new__(cls: AuxiliaryCommon, *args, **kwargs) -> AuxiliaryCommon: - """Create instance and add internal kiso log levels in - case the auxiliary is used outside the pykiso context - """ - if not hasattr(logging, "INTERNAL_WARNING"): - add_logging_level("INTERNAL_WARNING", logging.WARNING + 1) - add_logging_level("INTERNAL_INFO", logging.INFO + 1) - add_logging_level("INTERNAL_DEBUG", logging.DEBUG + 1) - return super(AuxiliaryCommon, cls).__new__(cls) - def __init__(self) -> None: """Auxiliary common attributes initialization.""" + add_internal_log_levels() self.name = None self.queue_in = None self.lock = None diff --git a/src/pykiso/connector.py b/src/pykiso/connector.py index 6f80150a..028ac6e8 100644 --- a/src/pykiso/connector.py +++ b/src/pykiso/connector.py @@ -20,12 +20,16 @@ """ import abc +import logging import multiprocessing import pathlib import threading +from typing import Dict, Optional from .types import MsgType, PathType +log = logging.getLogger(__name__) + class Connector(abc.ABC): """Abstract interface for all connectors to inherit from. @@ -96,28 +100,35 @@ def close(self) -> None: with self._lock: self._cc_close() - def cc_send(self, msg: MsgType, raw: bool = False, **kwargs) -> None: + def cc_send(self, msg: MsgType, *args, **kwargs) -> None: """Send a thread-safe message on the channel and wait for an acknowledgement. :param msg: message to send - :param raw: should the message be converted as pykiso.Message - or sent as it is :param kwargs: named arguments """ + if ("raw" in kwargs) or args: + log.internal_warning( + "Use of 'raw' keyword argument is deprecated. It won't be passed to '_cc_send'." + ) with self._lock_tx: - self._cc_send(msg=msg, raw=raw, **kwargs) + self._cc_send(msg=msg, **kwargs) - def cc_receive(self, timeout: float = 0.1, raw: bool = False) -> dict: + def cc_receive( + self, timeout: float = 0.1, *args, **kwargs + ) -> Dict[str, Optional[bytes]]: """Read a thread-safe message on the channel and send an acknowledgement. :param timeout: time in second to wait for reading a message - :param raw: should the message be returned as pykiso.Message or - sent as it is + :param kwargs: named arguments :return: the received message """ + if ("raw" in kwargs) or args: + log.internal_warning( + "Use of 'raw' keyword argument is deprecated. It won't be passed to '_cc_receive'." + ) with self._lock_rx: - return self._cc_receive(timeout=timeout, raw=raw) + return self._cc_receive(timeout=timeout, **kwargs) @abc.abstractmethod def _cc_open(self) -> None: @@ -130,23 +141,21 @@ def _cc_close(self) -> None: pass @abc.abstractmethod - def _cc_send(self, msg: MsgType, raw: bool = False, **kwargs) -> None: + def _cc_send(self, msg: MsgType, **kwargs) -> None: """Sends the message on the channel. :param msg: Message to send out - :param raw: send raw message without further work (default: False) :param kwargs: named arguments """ pass @abc.abstractmethod - def _cc_receive(self, timeout: float, raw: bool = False) -> dict: + def _cc_receive(self, timeout: float, **kwargs) -> Dict[str, Optional[bytes]]: """How to receive something from the channel. :param timeout: Time to wait in second for a message to be received - :param raw: send raw message without further work (default: False) - - :return: message.Message() - If one received / None - If not + :param kwargs: named arguments + :return: dictionary containing the received bytes if successful, otherwise None """ pass diff --git a/src/pykiso/interfaces/dt_auxiliary.py b/src/pykiso/interfaces/dt_auxiliary.py index 4f80e48b..5a863018 100644 --- a/src/pykiso/interfaces/dt_auxiliary.py +++ b/src/pykiso/interfaces/dt_auxiliary.py @@ -28,7 +28,7 @@ from typing import Any, Callable, List, Optional from ..exceptions import AuxiliaryCreationError -from ..logging_initializer import add_logging_level, initialize_loggers +from ..logging_initializer import add_internal_log_levels, initialize_loggers log = logging.getLogger(__name__) @@ -49,16 +49,6 @@ class DTAuxiliaryInterface(abc.ABC): for the reception and one for the transmmission. """ - def __new__(cls, *args, **kwargs): - """Create instance and add internal kiso log levels in - case the auxiliary is used outside the pykiso context - """ - if not hasattr(logging, "INTERNAL_WARNING"): - add_logging_level("INTERNAL_WARNING", logging.WARNING + 1) - add_logging_level("INTERNAL_INFO", logging.INFO + 1) - add_logging_level("INTERNAL_DEBUG", logging.DEBUG + 1) - return super(DTAuxiliaryInterface, cls).__new__(cls) - def __init__( self, name: str = None, @@ -82,10 +72,11 @@ def __init__( :param auto_start: determine if the auxiliayry is automatically started (magic import) or manually (by user) """ + initialize_loggers(activate_log) + add_internal_log_levels() self.name = name self.is_proxy_capable = is_proxy_capable self.auto_start = auto_start - initialize_loggers(activate_log) self.lock = threading.RLock() self.stop_tx = threading.Event() self.stop_rx = threading.Event() @@ -209,7 +200,7 @@ def _start_rx_task(self) -> None: log.internal_debug("reception task is not needed, don't start it") return - log.internal_debug(f"start reception task {self.name}_tx") + log.internal_debug(f"start reception task {self.name}_rx") self.rx_thread = threading.Thread( name=f"{self.name}_rx", target=self._reception_task ) diff --git a/src/pykiso/lib/auxiliaries/communication_auxiliary.py b/src/pykiso/lib/auxiliaries/communication_auxiliary.py index 6449436a..26df5cfe 100644 --- a/src/pykiso/lib/auxiliaries/communication_auxiliary.py +++ b/src/pykiso/lib/auxiliaries/communication_auxiliary.py @@ -202,7 +202,7 @@ def _run_command(self, cmd_message: str, cmd_data: bytes = None) -> bool: state = False if cmd_message == "send": try: - self.channel.cc_send(msg=cmd_data, raw=True) + self.channel.cc_send(msg=cmd_data) state = True except Exception: log.exception( @@ -223,7 +223,7 @@ def _receive_message(self, timeout_in_s: float) -> None: for a message """ try: - rcv_data = self.channel.cc_receive(timeout=timeout_in_s, raw=True) + rcv_data = self.channel.cc_receive(timeout=timeout_in_s) log.internal_debug(f"received message '{rcv_data}' from {self.channel}") msg = rcv_data.get("msg") if msg is not None and self.queueing_event.is_set(): diff --git a/src/pykiso/lib/auxiliaries/dut_auxiliary.py b/src/pykiso/lib/auxiliaries/dut_auxiliary.py index bc5604bc..b26202a6 100644 --- a/src/pykiso/lib/auxiliaries/dut_auxiliary.py +++ b/src/pykiso/lib/auxiliaries/dut_auxiliary.py @@ -19,7 +19,6 @@ .. currentmodule:: dut_auxiliary """ - from __future__ import annotations import functools @@ -27,14 +26,13 @@ import queue from typing import Callable, Optional -from pykiso import CChannel, Flasher, message +from pykiso import CChannel, Flasher, Message, message from pykiso.interfaces.dt_auxiliary import ( DTAuxiliaryInterface, close_connector, flash_target, open_connector, ) -from pykiso.types import MsgType log = logging.getLogger(__name__) @@ -333,7 +331,7 @@ def evaluate_report(self, report_msg: message.Message) -> None: def wait_and_get_report( self, blocking: bool = False, timeout_in_s: int = 0 - ) -> Optional[message.Message]: + ) -> Optional[Message]: """Wait for the report coming from the DUT. :param blocking: True: wait for timeout to expire, False: return @@ -353,7 +351,9 @@ def wait_and_get_report( return None def _run_command( - self, cmd_message: message.Message, cmd_data: bytes = None + self, + cmd_message: message.Message, + cmd_data: bytes = None, ) -> None: """Simply send the given command using the associated channel. @@ -361,7 +361,8 @@ def _run_command( :param cmd_data: not use """ try: - self.channel.cc_send(msg=cmd_message, raw=False) + # Serialize and send the message + self.channel.cc_send(msg=cmd_message.serialize()) except Exception: log.exception( f"encountered error while sending message '{cmd_message}' to {self.channel}" @@ -372,21 +373,24 @@ def _receive_message(self, timeout_in_s: float) -> None: :param timeout_in_s: Time in seconds to wait for an answer """ - recv = self.channel.cc_receive(timeout_in_s) - response = recv.get("msg") + + recv_response = self.channel.cc_receive(timeout_in_s) + response = recv_response.get("msg") if response is None: return + elif isinstance(response, str): + response = response.encode() + response = Message.parse_packet(response) # If a message was received just automatically acknowledge it # and populate the queue_out if response.msg_type != MESSAGE_TYPE.ACK: ack_cmd = response.generate_ack_message(message.MessageAckType.ACK) try: - self.channel.cc_send(msg=ack_cmd, raw=False) + self.channel.cc_send(msg=ack_cmd.serialize()) except Exception: log.exception( f"encountered error while sending acknowledge message for {response}!" ) - self.queue_out.put(response) diff --git a/src/pykiso/lib/auxiliaries/instrument_control_auxiliary/instrument_control_auxiliary.py b/src/pykiso/lib/auxiliaries/instrument_control_auxiliary/instrument_control_auxiliary.py index 4c0f61c3..9c293faa 100644 --- a/src/pykiso/lib/auxiliaries/instrument_control_auxiliary/instrument_control_auxiliary.py +++ b/src/pykiso/lib/auxiliaries/instrument_control_auxiliary/instrument_control_auxiliary.py @@ -96,7 +96,7 @@ def handle_write( """ log.internal_debug(f"Sending a write request in {self} for {write_command}") # Send the message with the termination character - self.channel.cc_send(msg=write_command + self.write_termination, raw=False) + self.channel.cc_send(msg=write_command + self.write_termination) if validation is not None: # Check that the writing request was successfully performed on the instrument @@ -168,7 +168,7 @@ def handle_read(self) -> str: :return: received response from instrument otherwise empty string """ - response = self.channel.cc_receive(raw=False) + response = self.channel.cc_receive() return response.get("msg") def query(self, query_command: str) -> Union[bytes, str]: @@ -196,9 +196,9 @@ def handle_query(self, query_command: str) -> str: response = self.channel.query(query_command + self.write_termination) return response.get("msg") else: - self.channel.cc_send(msg=query_command + self.write_termination, raw=False) + self.channel.cc_send(msg=query_command + self.write_termination) time.sleep(0.05) - response = self.channel.cc_receive(raw=False) + response = self.channel.cc_receive() return response.get("msg") def _create_auxiliary_instance(self) -> bool: diff --git a/src/pykiso/lib/auxiliaries/mp_proxy_auxiliary.py b/src/pykiso/lib/auxiliaries/mp_proxy_auxiliary.py index 67b5dd81..ef39822a 100644 --- a/src/pykiso/lib/auxiliaries/mp_proxy_auxiliary.py +++ b/src/pykiso/lib/auxiliaries/mp_proxy_auxiliary.py @@ -57,6 +57,7 @@ from typing import List, Optional, Tuple from pykiso import AuxiliaryInterface, CChannel, MpAuxiliaryInterface +from pykiso.lib.connectors.cc_mp_proxy import CCMpProxy from pykiso.test_setup.config_registry import ConfigRegistry from pykiso.test_setup.dynamic_loader import PACKAGE @@ -95,13 +96,13 @@ def __init__( activate=activate_trace, dir=trace_dir, name=trace_name ) self.proxy_channels = self.get_proxy_con(aux_list) - self.logger = None + self.logger: logging.Logger = None self.aux_list = aux_list super().__init__(**kwargs) def _init_trace( self, - logger, + logger: logging.Logger, activate: bool, t_dir: Optional[str] = None, t_name: Optional[str] = None, @@ -144,7 +145,7 @@ def _init_trace( handler.setLevel(logging.DEBUG) logger.addHandler(handler) - def get_proxy_con(self, aux_list: List[str]) -> Tuple[AuxiliaryInterface]: + def get_proxy_con(self, aux_list: List[str]) -> Tuple[CCMpProxy]: """Retrieve all connector associated to all given existing Auxiliaries. If auxiliary alias exists but auxiliary instance was not created @@ -155,7 +156,7 @@ def get_proxy_con(self, aux_list: List[str]) -> Tuple[AuxiliaryInterface]: :return: tuple containing all connectors associated to all given auxiliaries """ - channel_inst = [] + channel_inst: List[CCMpProxy] = [] for aux_name in aux_list: aux_inst = sys.modules.get(f"{PACKAGE}.auxiliaries.{aux_name}") @@ -177,6 +178,14 @@ def get_proxy_con(self, aux_list: List[str]) -> Tuple[AuxiliaryInterface]: else: log.error(f"Auxiliary : {aux_name} doesn't exist") + # Check if auxes/connectors are compatible with the proxy aux + self._check_channels_compatibility(channel_inst) + + # Finally bind the physical channel to the proxy channels to + # share its API to the user's auxiliaries + for channel in channel_inst: + channel._bind_channel_info(self) + return tuple(channel_inst) @staticmethod @@ -192,6 +201,22 @@ def _check_compatibility(aux: AuxiliaryInterface) -> None: f"Auxiliary {aux} is not compatible with a proxy auxiliary" ) + @staticmethod + def _check_channels_compatibility(channels: List[CChannel]) -> None: + """Check if all associated channels are compatible. + + :param channels: all channels collected by the proxy aux + + :raises TypeError: if the connector is not an instance of + CCProxy + """ + for channel in channels: + if not isinstance(channel, CCMpProxy): + raise TypeError( + f"Channel {channel} is not compatible! " + f"Expected a CCMpProxy instance, got {channel.__class__.__name__}" + ) + def _create_auxiliary_instance(self) -> bool: """Open current associated channel. @@ -282,7 +307,7 @@ def _receive_message(self, timeout_in_s: float = 0) -> None: for a message. """ try: - recv_response = self.channel.cc_receive(timeout=timeout_in_s, raw=True) + recv_response = self.channel.cc_receive(timeout=timeout_in_s) received_data = recv_response.get("msg") # if data are received, populate connected proxy connectors # queue out diff --git a/src/pykiso/lib/auxiliaries/proxy_auxiliary.py b/src/pykiso/lib/auxiliaries/proxy_auxiliary.py index ebd4a699..1f827a98 100644 --- a/src/pykiso/lib/auxiliaries/proxy_auxiliary.py +++ b/src/pykiso/lib/auxiliaries/proxy_auxiliary.py @@ -144,7 +144,7 @@ def _init_trace( return logger - def get_proxy_con(self, aux_list: List[str]) -> Tuple: + def get_proxy_con(self, aux_list: List[str]) -> Tuple[CCProxy, ...]: """Retrieve all connector associated to all given existing Auxiliaries. If auxiliary alias exists but auxiliary instance was not created @@ -155,7 +155,7 @@ def get_proxy_con(self, aux_list: List[str]) -> Tuple: :return: tuple containing all connectors associated to all given auxiliaries """ - channel_inst = [] + channel_inst: List[CCProxy] = [] for aux in aux_list: # aux_list can contain a auxiliary instance just grab the @@ -184,10 +184,15 @@ def get_proxy_con(self, aux_list: List[str]) -> Tuple: # invalid one else: log.error(f"Auxiliary '{aux}' doesn't exist") - # Finally just check if auxes/connectors are compatible with - # the proxy aux + + # Check if auxes/connectors are compatible with the proxy aux self._check_channels_compatibility(channel_inst) + # Finally bind the physical channel to the proxy channels to + # share its API to the user's auxiliaries + for channel in channel_inst: + channel._bind_channel_info(self) + return tuple(channel_inst) @staticmethod @@ -300,7 +305,7 @@ def _receive_message(self, timeout_in_s: float = 0) -> None: for a message """ try: - recv_response = self.channel.cc_receive(timeout=timeout_in_s, raw=True) + recv_response = self.channel.cc_receive(timeout=timeout_in_s) received_data = recv_response.get("msg") # if data are received, populate connector's queue_out if received_data is not None: diff --git a/src/pykiso/lib/auxiliaries/record_auxiliary.py b/src/pykiso/lib/auxiliaries/record_auxiliary.py index 1cc12a6e..7f67ffe8 100644 --- a/src/pykiso/lib/auxiliaries/record_auxiliary.py +++ b/src/pykiso/lib/auxiliaries/record_auxiliary.py @@ -189,7 +189,7 @@ def receive(self) -> None: if sys.getsizeof(self.get_data()) > self.max_file_size: log.error("Data size too large") - recv_response = self.channel.cc_receive(timeout=self.timeout, raw=True) + recv_response = self.channel.cc_receive(timeout=self.timeout) stream = recv_response.get("msg") source = recv_response.get("remote_id") diff --git a/src/pykiso/lib/auxiliaries/udsaux/uds_auxiliary.py b/src/pykiso/lib/auxiliaries/udsaux/uds_auxiliary.py index 0fb1661e..49e39882 100644 --- a/src/pykiso/lib/auxiliaries/udsaux/uds_auxiliary.py +++ b/src/pykiso/lib/auxiliaries/udsaux/uds_auxiliary.py @@ -85,7 +85,7 @@ def transmit(self, data: bytes, req_id: int, extended: bool = False) -> None: :param extended: True if addressing mode is extended otherwise False """ - self.channel._cc_send(msg=data, remote_id=req_id, raw=True) + self.channel._cc_send(msg=data, remote_id=req_id) def send_uds_raw( self, @@ -330,7 +330,7 @@ def _receive_message(self, timeout_in_s: float) -> None: :param timeout_in_s: timeout on reception. """ - recv_response = self.channel.cc_receive(timeout=timeout_in_s, raw=True) + recv_response = self.channel.cc_receive(timeout=timeout_in_s) received_data = recv_response.get("msg") arbitration_id = recv_response.get("remote_id") diff --git a/src/pykiso/lib/auxiliaries/udsaux/uds_server_auxiliary.py b/src/pykiso/lib/auxiliaries/udsaux/uds_server_auxiliary.py index 37e3eedc..9801bc5c 100644 --- a/src/pykiso/lib/auxiliaries/udsaux/uds_server_auxiliary.py +++ b/src/pykiso/lib/auxiliaries/udsaux/uds_server_auxiliary.py @@ -121,7 +121,7 @@ def transmit( """ req_id = req_id or self.req_id data = self._pad_message(data) - self.channel._cc_send(msg=data, remote_id=req_id, raw=True) + self.channel._cc_send(msg=data, remote_id=req_id) def receive(self) -> Optional[bytes]: """Receive a message through ITF connector. Called inside a thread, @@ -130,7 +130,7 @@ def receive(self) -> Optional[bytes]: :return: the received message or None. """ - rcv_data = self.channel._cc_receive(timeout=0, raw=True) + rcv_data = self.channel._cc_receive(timeout=0) msg, arbitration_id = rcv_data.get("msg"), rcv_data.get("remote_id") if msg is not None and arbitration_id == self.res_id: return msg @@ -255,7 +255,7 @@ def _receive_message(self, timeout_in_s: float) -> None: :param timeout_in_s: timeout on reception. """ - rcv_data = self.channel.cc_receive(timeout_in_s, raw=True) + rcv_data = self.channel.cc_receive(timeout_in_s) msg, arbitration_id = rcv_data.get("msg"), rcv_data.get("remote_id") if msg is not None and arbitration_id == self.res_id: try: diff --git a/src/pykiso/lib/connectors/cc_example.py b/src/pykiso/lib/connectors/cc_example.py index f1a1335d..1909ccdb 100644 --- a/src/pykiso/lib/connectors/cc_example.py +++ b/src/pykiso/lib/connectors/cc_example.py @@ -52,19 +52,16 @@ def _cc_close(self) -> None: """Close the channel.""" log.internal_info("close channel") - def _cc_send(self, msg: message.Message, raw: bool = False) -> None: + def _cc_send(self, msg: bytes) -> None: """Sends the message on the channel. - :param msg: message to send, should be Message type like. - :param raw: if raw is false serialize it using Message serialize. + :param msg: message to send. - :raise NotImplementedError: sending raw bytes is not supported. """ with self.lock: - if raw: - raise NotImplementedError() log.internal_debug("Send: {}".format(msg)) # Exit if ack sent + msg = message.Message.parse_packet(msg) if msg.get_message_type() == message.MessageType.ACK: return @@ -80,42 +77,38 @@ def _cc_send(self, msg: message.Message, raw: bool = False) -> None: self.report_requested_message = msg.serialize() - def _cc_receive( - self, timeout: float = 0.1, raw: bool = False - ) -> Dict[str, Optional[message.Message]]: - """Reads from the channel - decorator usage for test. + def _cc_receive(self, timeout: float = 0.1) -> Dict[str, Optional[bytes]]: + """Craft a Message that would be received from the device under test. :param timeout: not use - :param raw: if raw is false serialize it using Message serialize. - :raise NotImplementedError: receiving raw bytes is not supported. - - :return: Message if successful, otherwise None + :return: dictionary containing the received bytes if successful, otherwise None """ with self.lock: - if raw: - raise NotImplementedError() + received_message = None # Simulate a real wait for message time.sleep(1e-3) if self.last_received_message is not None: # Transform into ack - r_message = message.Message.parse_packet(self.last_received_message) - r_message.msg_type = message.MessageType.ACK - r_message.sub_type = message.MessageAckType.ACK + received_message = message.Message.parse_packet( + self.last_received_message + ) + received_message.msg_type = message.MessageType.ACK + received_message.sub_type = message.MessageAckType.ACK # Delete the stored raw message + received_message = received_message.serialize() self.last_received_message = None - # Return the ACK - log.internal_debug("Receive: {}".format(r_message)) - return {"msg": r_message} + log.internal_debug("Receive: {}".format(received_message)) elif self.report_requested_message is not None: # Transform message to ACK - r_message = message.Message.parse_packet(self.report_requested_message) - r_message.msg_type = message.MessageType.REPORT - r_message.sub_type = message.MessageReportType.TEST_PASS + received_message = message.Message.parse_packet( + self.report_requested_message + ) + received_message.msg_type = message.MessageType.REPORT + received_message.sub_type = message.MessageReportType.TEST_PASS # Delete the stored raw message + received_message = received_message.serialize() self.report_requested_message = None - # Return REPORT - log.internal_debug("Receive: {}".format(r_message)) - return {"msg": r_message} - else: - return {"msg": None} + log.internal_debug("Receive: {}".format(received_message)) + + return {"msg": received_message} diff --git a/src/pykiso/lib/connectors/cc_fdx_lauterbach.py b/src/pykiso/lib/connectors/cc_fdx_lauterbach.py index fa5aa713..a69d2ea5 100644 --- a/src/pykiso/lib/connectors/cc_fdx_lauterbach.py +++ b/src/pykiso/lib/connectors/cc_fdx_lauterbach.py @@ -24,7 +24,7 @@ import logging import subprocess import time -from typing import Dict, Union +from typing import Dict, Optional, Union from pykiso import connector from pykiso.message import Message @@ -236,21 +236,16 @@ def _cc_close(self) -> None: # Close Trace32 application self.t32_api.T32_Cmd("QUIT".encode("latin-1")) - def _cc_send(self, msg: Message or bytes, raw: bool = False) -> int: + def _cc_send(self, msg: bytes) -> int: """Sends a message using FDX channel. :param msg: message - :param raw: boolean precising the message type (encoded or not) :return: poll length """ log.internal_debug(f"===> {msg}") log.internal_debug(f"Sent on channel {self.fdxout}") - # Encode message if it is raw - if not raw: - msg = msg.serialize() - # Create and fill the buffer with the message buffer = ctypes.pointer(ctypes.create_string_buffer(len(msg))) buffer.contents.raw = msg @@ -263,13 +258,9 @@ def _cc_send(self, msg: Message or bytes, raw: bool = False) -> int: ) return poll_len - def _cc_receive( - self, timeout: float = 0.1, raw: bool = False - ) -> Dict[str, Union[bytes, str, None]]: + def _cc_receive(self, timeout: float = 0.1) -> Dict[str, Union[bytes, str, None]]: """Receive message using the FDX channel. - :param raw: boolean precising the message type - :return: message """ # Add a small delay to allow other functions to execute diff --git a/src/pykiso/lib/connectors/cc_mp_proxy.py b/src/pykiso/lib/connectors/cc_mp_proxy.py index eb70e6a6..035af6f7 100644 --- a/src/pykiso/lib/connectors/cc_mp_proxy.py +++ b/src/pykiso/lib/connectors/cc_mp_proxy.py @@ -22,17 +22,17 @@ .. currentmodule:: cc_mp_proxy """ - from __future__ import annotations import logging import multiprocessing import queue -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from pykiso.connector import CChannel if TYPE_CHECKING: + from pykiso.lib.auxiliaries.mp_proxy_auxiliary import MpProxyAuxiliary from pykiso.types import ProxyReturn log = logging.getLogger(__name__) @@ -43,11 +43,53 @@ class CCMpProxy(CChannel): def __init__(self, **kwargs): """Initialize attributes.""" + kwargs.update(processing=True) super().__init__(**kwargs) # instantiate directly both queue_in and queue_out self.queue_in = multiprocessing.Queue() self.queue_out = multiprocessing.Queue() self.timeout = 1 + # used for physical channel attributes access from main auxiliary + self._proxy = None + self._physical_channel = None + + def _bind_channel_info(self, proxy_aux: MpProxyAuxiliary): + """Bind a :py:class:`~pykiso.lib.auxiliaries.mp_proxy_auxiliary.MpProxyAuxiliary` + instance that is instanciated in order to handle the connection of + multiple auxiliaries to a single communication channel in order to + hide the underlying proxy setup. + + :param proxy_aux: the proxy auxiliary instance that is holding the + real communication channel. + """ + self._proxy = proxy_aux + self._physical_channel = proxy_aux.channel + + def __getattr__(self, name: str) -> Any: + """Implement getattr to retrieve attributes from the real channel attached + to the underlying :py:class:`~pykiso.lib.auxiliaries.mp_proxy_auxiliary.MpProxyAuxiliary`. + + :param name: name of the attribute to get. + :raises AttributeError: if the attribute is not part of the real + channel instance or if the real channel hasn't been bound to + this proxy channel yet. + :return: the found attribute value. + """ + if self._physical_channel is not None: + with self._proxy.lock: + return getattr(self._physical_channel, name) + raise AttributeError( + f"{self.__class__.__name__} object has no attribute '{name}'" + ) + + def __getstate__(self): + """Avoid getattr to be called on pickling before instanciation, + which would cause infinite recursion. + """ + return self.__dict__ + + def __setstate__(self, state): + self.__dict__ = state def _cc_open(self) -> None: """Open proxy channel. @@ -76,13 +118,12 @@ def _cc_send(self, *args: tuple, **kwargs: dict) -> None: log.internal_debug(f"put at proxy level: {args} {kwargs}") self.queue_in.put((args, kwargs)) - def _cc_receive(self, timeout: float = 0.1, raw: bool = False) -> ProxyReturn: + def _cc_receive(self, timeout: float = 0.1) -> ProxyReturn: """Depopulate the queue out of the proxy connector. :param timeout: not used - :param raw: not used - :return: raw bytes and source when it exist. if queue timeout + :return: bytes and source when it exist. if queue timeout is reached return None """ try: diff --git a/src/pykiso/lib/connectors/cc_pcan_can.py b/src/pykiso/lib/connectors/cc_pcan_can.py index bb3a3eea..788bf07e 100644 --- a/src/pykiso/lib/connectors/cc_pcan_can.py +++ b/src/pykiso/lib/connectors/cc_pcan_can.py @@ -22,7 +22,7 @@ import logging import sys from pathlib import Path -from typing import Dict, Union +from typing import Dict, Optional, Union import can import can.bus @@ -303,48 +303,37 @@ def _cc_close(self) -> None: finally: self.raw_pcan_interface = None - def _cc_send(self, msg: MessageType, raw: bool = False, **kwargs) -> None: + def _cc_send( + self, msg: MessageType, remote_id: Optional[int] = None, **kwargs + ) -> None: """Send a CAN message at the configured id. - If remote_id parameter is not given take configured ones, in addition if - raw is set to True take the msg parameter as it is otherwise parse it using - test entity protocol format. + If remote_id parameter is not given take configured ones :param msg: data to send - :param raw: boolean use to select test entity protocol format + :param remote_id: destination can id used :param kwargs: named arguments - """ - _data = msg - remote_id = kwargs.get("remote_id") - - if remote_id is None: - remote_id = self.remote_id - if not raw: - _data = msg.serialize() + remote_id = remote_id or self.remote_id can_msg = can.Message( arbitration_id=remote_id, - data=_data, + data=msg, is_extended_id=self.is_extended_id, is_fd=self.is_fd, bitrate_switch=self.enable_brs, ) self.bus.send(can_msg) - log.internal_debug(f"{self} sent CAN Message: {can_msg}, data: {_data}") + log.internal_debug(f"{self} sent CAN Message: {can_msg}, data: {msg}") def _cc_receive( - self, timeout: float = 0.0001, raw: bool = False - ) -> Dict[str, Union[MessageType, int]]: + self, timeout: float = 0.0001 + ) -> Dict[str, Union[bytes, int, None]]: """Receive a can message using configured filters. - If raw parameter is set to True return received message as it is (bytes) - otherwise test entity protocol format is used and Message class type is returned. - :param timeout: timeout applied on reception - :param raw: boolean use to select test entity protocol format :return: the received data and the source can id """ @@ -355,8 +344,7 @@ def _cc_receive( frame_id = received_msg.arbitration_id payload = received_msg.data timestamp = received_msg.timestamp - if not raw: - payload = Message.parse_packet(payload) + log.internal_debug( f"received CAN Message: {frame_id}, {payload}, {timestamp}" ) diff --git a/src/pykiso/lib/connectors/cc_process.py b/src/pykiso/lib/connectors/cc_process.py index 766e86c5..be9ba000 100644 --- a/src/pykiso/lib/connectors/cc_process.py +++ b/src/pykiso/lib/connectors/cc_process.py @@ -27,9 +27,8 @@ import subprocess import threading from dataclasses import dataclass -from typing import IO, ByteString, Callable, Dict, List, Optional, Tuple, Union +from typing import IO, ByteString, List, Optional, Union -from pykiso import Message from pykiso.connector import CChannel log = logging.getLogger(__name__) @@ -188,11 +187,10 @@ def _cc_close(self) -> None: """Close the channel.""" self._cleanup() - def _cc_send(self, msg: MessageType, raw: bool = False, **kwargs) -> None: + def _cc_send(self, msg: MessageType, **kwargs) -> None: """Execute process commands or write data to stdin :param msg: data to send - :param raw: unused :raises CCProcessError: Stdin pipe is not enabled @@ -284,11 +282,11 @@ def _create_message_dict(msg: Union[ProcessMessage, ProcessExit]) -> dict: ret = {"msg": {"exit": msg.exit_code}} return ret - def _cc_receive(self, timeout: float = 0.0001, raw: bool = False) -> MessageType: + def _cc_receive(self, timeout: float = 0.0001) -> MessageType: """Receive messages :param timeout: Time to wait in seconds for a message to be received - :param raw: unused + :param size: unused return The received message """ diff --git a/src/pykiso/lib/connectors/cc_proxy.py b/src/pykiso/lib/connectors/cc_proxy.py index a70b59b6..b4774c8f 100644 --- a/src/pykiso/lib/connectors/cc_proxy.py +++ b/src/pykiso/lib/connectors/cc_proxy.py @@ -32,6 +32,7 @@ from pykiso.connector import CChannel if TYPE_CHECKING: + from pykiso.lib.auxiliaries.proxy_auxiliary import ProxyAuxiliary from pykiso.types import ProxyReturn @@ -48,6 +49,39 @@ def __init__(self, **kwargs): self.timeout = 1 self._lock = threading.Lock() self._tx_callback = None + self._proxy = None + self._physical_channel = None + + def _bind_channel_info(self, proxy_aux: ProxyAuxiliary): + """Bind a :py:class:`~pykiso.lib.auxiliaries.proxy_auxiliary.ProxyAuxiliary` + instance that is instanciated in order to handle the connection of + multiple auxiliaries to a single communication channel. + + This allows to access the real communication channel's attributes + and hides the underlying proxy setup. + + :param proxy_aux: the proxy auxiliary instance that is holding the + real communication channel. + """ + self._proxy = proxy_aux + self._physical_channel = proxy_aux.channel + + def __getattr__(self, name: str) -> Any: + """Implement getattr to retrieve attributes from the real channel attached + to the underlying :py:class:`~pykiso.lib.auxiliaries.proxy_auxiliary.ProxyAuxiliary`. + + :param name: name of the attribute to get. + :raises AttributeError: if the attribute is not part of the real + channel instance or if the real channel hasn't been bound to + this proxy channel yet. + :return: the found attribute value. + """ + if self._physical_channel is not None: + with self._proxy.lock: + return getattr(self._physical_channel, name) + raise AttributeError( + f"{self.__class__.__name__} object has no attribute '{name}'" + ) def detach_tx_callback(self) -> None: """Detach the current callback.""" @@ -90,13 +124,12 @@ def _cc_send(self, *args: Any, **kwargs: Any) -> None: # call the attached ProxyAuxiliary's run_command method self._tx_callback(self, *args, **kwargs) - def _cc_receive(self, timeout: float = 0.1, raw: bool = False) -> ProxyReturn: + def _cc_receive(self, timeout: float = 0.1) -> ProxyReturn: """Depopulate the queue out of the proxy connector. :param timeout: not used - :param raw: not used - :return: raw bytes and source when it exist. if queue timeout + :return: bytes and source when it exist. if queue timeout is reached return None """ try: diff --git a/src/pykiso/lib/connectors/cc_raw_loopback.py b/src/pykiso/lib/connectors/cc_raw_loopback.py index 5feafc93..a6a30f61 100644 --- a/src/pykiso/lib/connectors/cc_raw_loopback.py +++ b/src/pykiso/lib/connectors/cc_raw_loopback.py @@ -20,7 +20,7 @@ """ import threading from collections import deque -from typing import Dict +from typing import Dict, Optional from pykiso import CChannel from pykiso.types import MsgType @@ -46,22 +46,20 @@ def _cc_close(self) -> None: """Close loopback channel.""" self._loopback_buffer = None - def _cc_send(self, msg: MsgType, raw: bool = True) -> None: + def _cc_send(self, msg: MsgType) -> None: """Send a message by simply putting message in deque. :param msg: message to send, should be Message type or bytes. - :param raw: if raw is True simply send it as it is, otherwise apply serialization """ with self.lock: self._loopback_buffer.append(msg) - def _cc_receive(self, timeout: float, raw: bool = True) -> Dict[str, MsgType]: + def _cc_receive(self, timeout: float) -> Dict[str, Optional[bytes]]: """Read message by simply removing an element from the left side of deque. :param timeout: timeout applied on receive event - :param raw: if raw is True return raw bytes, otherwise Message type like - :return: Message or raw bytes if successful, otherwise None + :return: dictionary containing the received bytes if successful, otherwise None """ with self.lock: try: diff --git a/src/pykiso/lib/connectors/cc_rtt_segger.py b/src/pykiso/lib/connectors/cc_rtt_segger.py index ab21aab1..404c0f0b 100644 --- a/src/pykiso/lib/connectors/cc_rtt_segger.py +++ b/src/pykiso/lib/connectors/cc_rtt_segger.py @@ -264,15 +264,13 @@ def _cc_close(self) -> None: self.jlink.close() log.internal_info("RTT communication closed") - def _cc_send(self, msg: Message or bytes, raw: bool = False) -> None: + def _cc_send(self, msg: bytes) -> None: """Send message using the corresponding RTT buffer. - :param msg: message to send, should be Message type or bytes. - :param raw: if raw is True simply send it as it is, otherwise apply serialization + :param msg: message to send, should be bytes. """ try: - if not raw: - msg = msg.serialize() + msg = list(msg) bytes_written = self.jlink.rtt_write(self.tx_buffer_idx, msg) log.internal_debug( @@ -287,18 +285,17 @@ def _cc_send(self, msg: Message or bytes, raw: bool = False) -> None: ) def _cc_receive( - self, timeout: float = 0.1, raw: bool = False - ) -> Dict[str, Union[Message, bytes, None]]: + self, timeout: float = 0.1, size: int = None, **kwargs + ) -> Dict[str, Optional[bytes]]: """Read message from the corresponding RTT buffer. :param timeout: timeout applied on receive event - :param raw: if raw is True return raw bytes, otherwise Message type like - - :return: Message or raw bytes if successful, otherwise None + :param size: maximum amount of bytes to read + :return: dictionary containing the received bytes if successful, otherwise None """ is_timeout = False # maximum amount of bytes to read out - size = self.rx_buffer_size if raw else Message().header_size + size = size or self.rx_buffer_size t_start = time.perf_counter() # rtt_read is not a blocking method due to this fact a while loop is used @@ -309,12 +306,6 @@ def _cc_receive( msg_received = self.jlink.rtt_read(self.rx_buffer_idx, size) # if a message is received if msg_received: - if not raw: - # Read the payload and CRC - msg_received += self.jlink.rtt_read( - self.rx_buffer_idx, - msg_received[-1] + Message().crc_byte_size, - ) # Parse the bytes list into bytes string msg_received = bytes(msg_received) log.internal_debug( @@ -323,8 +314,6 @@ def _cc_receive( msg_received, len(msg_received), ) - if not raw: - msg_received = Message.parse_packet(msg_received) break except Exception: log.exception( diff --git a/src/pykiso/lib/connectors/cc_serial.py b/src/pykiso/lib/connectors/cc_serial.py index 78386d6e..fad03344 100644 --- a/src/pykiso/lib/connectors/cc_serial.py +++ b/src/pykiso/lib/connectors/cc_serial.py @@ -123,13 +123,10 @@ def _cc_close(self) -> None: """Close serial port""" self.serial.close() - def _cc_send( - self, msg: ByteString, raw: bool = True, timeout: float = None - ) -> None: + def _cc_send(self, msg: ByteString, timeout: float = None) -> None: """Sends data to the serial port :param msg: data to send - :param raw: unused :param timeout: write timeout in seconds. None sets it to blocking, defaults to None :raises: SerialTimeoutException - In case a write timeout is configured @@ -141,18 +138,15 @@ def _cc_send( self.serial.write(msg) - def _cc_receive(self, timeout=0.00001, raw: bool = True) -> Dict[str, bytes]: + def _cc_receive(self, timeout=0.00001) -> Dict[str, Optional[bytes]]: """Read bytes from the serial port. Try to read one byte in blocking mode. After blocking read check remaining bytes and read them without a blocking call. :param timeout: timeout in seconds, 0 for non blocking read, defaults to 0.00001 - :param raw: raw mode only, defaults to True :raises NotImplementedError: if raw is to True :return: received bytes """ - if not raw: - raise NotImplementedError() self.serial.timeout = timeout diff --git a/src/pykiso/lib/connectors/cc_socket_can/cc_socket_can.py b/src/pykiso/lib/connectors/cc_socket_can/cc_socket_can.py index 57847836..6012b039 100644 --- a/src/pykiso/lib/connectors/cc_socket_can/cc_socket_can.py +++ b/src/pykiso/lib/connectors/cc_socket_can/cc_socket_can.py @@ -23,7 +23,7 @@ import platform import time from pathlib import Path -from typing import Dict, Union +from typing import Dict, Optional, Union import can import can.bus @@ -150,48 +150,33 @@ def _cc_close(self) -> None: del self.logger self.logger = None - def _cc_send(self, msg: MessageType, raw: bool = False, **kwargs) -> None: + def _cc_send( + self, msg: MessageType, remote_id: Optional[int] = None, **kwargs + ) -> None: """Send a CAN message at the configured id. - - If remote_id parameter is not given take configured ones, in addition if - raw is set to True take the msg parameter as it is otherwise parse it using - test entity protocol format. + If remote_id parameter is not given take configured ones :param msg: data to send :param remote_id: destination can id used - :param raw: boolean use to select test entity protocol format """ - _data = msg - remote_id = kwargs.get("remote_id") - - if remote_id is None: - remote_id = self.remote_id - - if not raw: - _data = msg.serialize() + remote_id = remote_id or self.remote_id can_msg = can.Message( arbitration_id=remote_id, - data=_data, + data=msg, is_extended_id=self.is_extended_id, is_fd=self.is_fd, bitrate_switch=self.enable_brs, ) self.bus.send(can_msg) - log.internal_debug(f"{self} sent CAN Message: {can_msg}, data: {_data}") + log.internal_debug(f"{self} sent CAN Message: {can_msg}, data: {msg}") - def _cc_receive( - self, timeout: float = 0.0001, raw: bool = False - ) -> Dict[str, Union[MessageType, int]]: + def _cc_receive(self, timeout: float = 0.0001) -> Dict[str, Union[bytes, int]]: """Receive a can message using configured filters. - If raw parameter is set to True return received message as it is (bytes) - otherwise test entity protocol format is used and Message class type is returned. - :param timeout: timeout applied on reception - :param raw: boolean use to select test entity protocol format :return: tuple containing the received data and the source can id """ @@ -201,8 +186,7 @@ def _cc_receive( frame_id = received_msg.arbitration_id payload = received_msg.data timestamp = received_msg.timestamp - if not raw: - payload = Message.parse_packet(payload) + log.internal_debug( "received CAN Message: {}, {}, {}".format( frame_id, payload, timestamp diff --git a/src/pykiso/lib/connectors/cc_tcp_ip.py b/src/pykiso/lib/connectors/cc_tcp_ip.py index a937c383..a21b4934 100644 --- a/src/pykiso/lib/connectors/cc_tcp_ip.py +++ b/src/pykiso/lib/connectors/cc_tcp_ip.py @@ -20,7 +20,7 @@ """ import logging import socket -from typing import Dict, Union +from typing import Dict, Optional, Union from pykiso import CChannel @@ -62,26 +62,18 @@ def _cc_close(self) -> None: ) self.socket.close() - def _cc_send(self, msg: bytes or str, raw: bool = False) -> None: + def _cc_send(self, msg: bytes or str) -> None: """Send a message via socket. :param msg: message to send - :param raw: is the message in a raw format (True) or is it a string (False)? """ - if msg is not None and raw is False: - msg = msg.encode() - log.internal_debug(f"Sending {msg} via socket to {self.dest_ip}") self.socket.send(msg) - def _cc_receive( - self, timeout=0.01, raw: bool = False - ) -> Dict[str, Union[bytes, str, None]]: + def _cc_receive(self, timeout=0.01) -> Dict[str, Optional[bytes]]: """Read message from socket. :param timeout: time in second to wait for reading a message - :param raw: should the message be returned raw or should it be interpreted as a - pykiso.Message? :return: Message if successful, otherwise none """ @@ -89,10 +81,6 @@ def _cc_receive( try: msg_received = self.socket.recv(self.max_msg_size) - - if not raw: - msg_received = msg_received.decode().strip() - log.internal_debug(f"Socket at {self.dest_ip} received: {msg_received}") except socket.timeout: log.exception( diff --git a/src/pykiso/lib/connectors/cc_uart.py b/src/pykiso/lib/connectors/cc_uart.py index 3137425d..c90e52c1 100644 --- a/src/pykiso/lib/connectors/cc_uart.py +++ b/src/pykiso/lib/connectors/cc_uart.py @@ -21,6 +21,7 @@ import struct import time +from typing import Optional import serial @@ -76,9 +77,8 @@ def _cc_send(self, msg): rawPacket = struct.pack(">H", crc) + rawPacket # Force big endian notation self._send_using_slip(rawPacket) - def _cc_receive(self, timeout=0.00001, raw=False): - if raw: - raise NotImplementedError() + def _cc_receive(self, timeout=0.00001): + self.serial.timeout = timeout or self.timeout receivingState = self.WAITING_FOR_START diff --git a/src/pykiso/lib/connectors/cc_udp.py b/src/pykiso/lib/connectors/cc_udp.py index 1c3e76ff..2ad5a159 100644 --- a/src/pykiso/lib/connectors/cc_udp.py +++ b/src/pykiso/lib/connectors/cc_udp.py @@ -23,9 +23,9 @@ import logging import socket -from typing import Dict, Union +from typing import Dict, Optional -from pykiso import Message, connector +from pykiso import connector log = logging.getLogger(__name__) @@ -60,34 +60,26 @@ def _cc_close(self) -> None: """Close the udp socket.""" self.udp_socket.close() - def _cc_send(self, msg: bytes or Message, raw: bool = False) -> None: + def _cc_send(self, msg: bytes) -> None: """Send message using udp socket - :param msg: message to send, should be Message type or bytes. - :param raw: if raw is True simply send it as it is, otherwise apply serialization + :param msg: message to send, should bytes. """ - if not raw: - msg = msg.serialize() self.udp_socket.sendto(msg, (self.dest_ip, self.dest_port)) - def _cc_receive( - self, timeout: float = 0.0000001, raw: bool = False - ) -> Dict[str, Union[Message, bytes, None]]: + def _cc_receive(self, timeout: float = 0.0000001) -> Dict[str, Optional[bytes]]: """Read message from socket. :param timeout: timeout applied on receive event - :param raw: if raw is True return raw bytes, otherwise Message type like - :return: Message or raw bytes if successful, otherwise None + :return: dictionary containing the received bytes if successful, otherwise None """ self.udp_socket.settimeout(timeout or self.timeout) try: msg_received, self.source_addr = self.udp_socket.recvfrom(self.max_msg_size) - if not raw: - msg_received = Message.parse_packet(msg_received) # catch the errors linked to the socket timeout without blocking except BlockingIOError: log.internal_debug(f"encountered error while receiving message via {self}") diff --git a/src/pykiso/lib/connectors/cc_udp_server.py b/src/pykiso/lib/connectors/cc_udp_server.py index 646eb368..3b63e809 100644 --- a/src/pykiso/lib/connectors/cc_udp_server.py +++ b/src/pykiso/lib/connectors/cc_udp_server.py @@ -25,7 +25,7 @@ """ import logging import socket -from typing import Dict, Union +from typing import Dict, Optional, Union from pykiso import Message, connector @@ -61,25 +61,18 @@ def _cc_close(self) -> None: log.internal_info(f"UDP socket closed at address: {self.address}") self.udp_socket.close() - def _cc_send(self, msg: bytes or Message, raw: bool = False) -> None: + def _cc_send(self, msg: bytes) -> None: """Send back a UDP message to the previous sender. - :param msg: message instance to serialize into bytes + :param msg: message to sent, should be bytes """ - if not raw: - msg = msg.serialize() - log.internal_debug(f"UDP server send: {msg} at {self.address}") self.udp_socket.sendto(msg, self.address) - def _cc_receive( - self, timeout=0.0000001, raw: bool = False - ) -> Dict[str, Union[Message, bytes, None]]: + def _cc_receive(self, timeout=0.0000001) -> Dict[str, Optional[bytes]]: """Read message from UDP socket. :param timeout: timeout applied on receive event - :param raw: should the message be returned raw or should it be interpreted as a - pykiso.Message? :return: Message if successful, otherwise none """ @@ -88,10 +81,6 @@ def _cc_receive( try: msg_received, self.address = self.udp_socket.recvfrom(self.max_msg_size) - - if not raw: - msg_received = Message.parse_packet(msg_received) - log.internal_debug(f"UDP server receives: {msg_received} at {self.address}") # catch the errors linked to the socket timeout without blocking except BlockingIOError: diff --git a/src/pykiso/lib/connectors/cc_usb.py b/src/pykiso/lib/connectors/cc_usb.py index b6cc3882..9c546c27 100644 --- a/src/pykiso/lib/connectors/cc_usb.py +++ b/src/pykiso/lib/connectors/cc_usb.py @@ -30,9 +30,8 @@ class CCUsb(cc_uart.CCUart): def __init__(self, serial_port): super().__init__(serial_port, baudrate=9600) - def _cc_send(self, msg, raw=False): - if raw: - raise NotImplementedError() + def _cc_send(self, msg): + raw_packet = msg.serialize() crc = self._calculate_crc32(raw_packet) diff --git a/src/pykiso/lib/connectors/cc_vector_can.py b/src/pykiso/lib/connectors/cc_vector_can.py index fdc1c476..358fcde6 100644 --- a/src/pykiso/lib/connectors/cc_vector_can.py +++ b/src/pykiso/lib/connectors/cc_vector_can.py @@ -20,7 +20,7 @@ """ import logging -from typing import Dict, Union +from typing import Dict, Optional, Union import can import can.bus @@ -122,30 +122,21 @@ def _cc_close(self) -> None: self.bus.shutdown() self.bus = None - def _cc_send(self, msg, raw: bool = False, **kwargs) -> None: + def _cc_send(self, msg, remote_id: Optional[int] = None, **kwargs) -> None: """Send a CAN message at the configured id. - If remote_id parameter is not given take configured ones, in addition if - raw is set to True take the msg parameter as it is otherwise parse it using - test entity protocol format. + If remote_id parameter is not given take configured ones. :param msg: data to send - :param raw: boolean use to select test entity protocol format - :param kwargs: destination can id used + :param remote_id: destination can id used + :param kwargs: named arguments """ - _data = msg - remote_id = kwargs.get("remote_id") - - if remote_id is None: - remote_id = self.remote_id - - if not raw: - _data = msg.serialize() + remote_id = remote_id or self.remote_id can_msg = can.Message( arbitration_id=remote_id, - data=_data, + data=msg, is_extended_id=self.is_extended_id, is_fd=self.fd, bitrate_switch=self.enable_brs, @@ -154,16 +145,10 @@ def _cc_send(self, msg, raw: bool = False, **kwargs) -> None: log.internal_debug(f"sent CAN Message: {can_msg}") - def _cc_receive( - self, timeout=0.0001, raw: bool = False - ) -> Dict[str, Union[MessageType, int]]: + def _cc_receive(self, timeout=0.0001) -> Dict[str, Union[MessageType, int]]: """Receive a can message using configured filters. - If raw parameter is set to True return received message as it is (bytes) - otherwise test entity protocol format is used and Message class type is returned. - :param timeout: timeout applied on reception - :param raw: boolean use to select test entity protocol format :return: the received data and the source can id """ @@ -173,10 +158,6 @@ def _cc_receive( if received_msg is not None: frame_id = received_msg.arbitration_id payload = received_msg.data - - if not raw: - payload = Message.parse_packet(payload) - log.internal_debug(f"received CAN Message: {frame_id}, {payload}") return {"msg": payload, "remote_id": frame_id} diff --git a/src/pykiso/lib/connectors/cc_visa.py b/src/pykiso/lib/connectors/cc_visa.py index 99e5c4f1..47f73c1d 100644 --- a/src/pykiso/lib/connectors/cc_visa.py +++ b/src/pykiso/lib/connectors/cc_visa.py @@ -21,6 +21,7 @@ import abc import logging +from typing import Dict, Optional import pyvisa @@ -83,39 +84,34 @@ def _process_request(self, request: str, request_data: str = "") -> str: f"Request {request}:{request_data} failed! Timeout expired before operation completed." ) except Exception as e: - log.exception(f"Request {request}:{request_data} failed!\n{e}") + log.exception(f"Request {request}: {request_data} failed!\n{e}") else: log.internal_debug(f"Response received: {recv}") finally: response = {"msg": str(recv)} return response - def _cc_send(self, msg: MsgType, raw: bool = False) -> None: + def _cc_send(self, msg: MsgType) -> None: """Send a write request to the instrument :param msg: message to send - :param raw: is the message in a raw format (True) or is it a string (False)? """ - if raw: - msg = msg.decode() + msg = msg.decode() log.internal_debug(f"Writing {msg} to {self.resource_name}") self.resource.write(msg) - def _cc_receive(self, timeout: float = 0.1, raw: bool = False) -> str: + def _cc_receive(self, timeout: float = 0.1) -> Dict[str, Optional[bytes]]: """Send a read request to the instrument :param timeout: time in second to wait for reading a message - :param raw: should the message be returned raw or should it be interpreted as a - pykiso.Message? + :return: the received response message, or an empty string if the request expired with a timeout. """ - if raw: - return {"msg": self._process_request("read")["msg"].encode()} - else: - return self._process_request("read") + + return {"msg": self._process_request("read")["msg"].encode()} def query(self, query_command: str) -> str: """Send a query request to the instrument diff --git a/src/pykiso/message.py b/src/pykiso/message.py index f937151e..65ce6836 100644 --- a/src/pykiso/message.py +++ b/src/pykiso/message.py @@ -117,14 +117,20 @@ class Message: TYPE: msg_type | message_token | sub_type | errorCode | """ + crc_byte_size = 2 + header_size = 8 + max_payload_size = 0xFF + max_message_size = header_size + max_payload_size + crc_byte_size + reserved = 0 + def __init__( self, msg_type: Union[int, MessageType] = 0, - sub_type=0, + sub_type: int = 0, error_code: int = 0, test_suite: int = 0, test_case: int = 0, - tlv_dict: Optional[Dict] = None, + tlv_dict: Optional[Dict[str, Union[int, bytes]]] = None, ): """Create a generic message. @@ -148,12 +154,9 @@ def __init__( """ self.msg_type = msg_type global msg_cnt - self.crc_byte_size = 2 - self.header_size = 8 self.msg_token = next(msg_cnt) self.sub_type = sub_type self.error_code = error_code - self.reserved = 0 self.test_suite = test_suite self.test_case = test_case self.tlv_dict = tlv_dict @@ -247,7 +250,6 @@ def parse_packet(cls, raw_packet: bytes) -> Message: :return: itself """ msg = cls() - if (not isinstance(raw_packet, bytes)) and ( len(raw_packet) < (msg.header_size + msg.crc_byte_size) ): diff --git a/src/pykiso/test_result/templates/report_template.css b/src/pykiso/test_result/templates/report_template.css index 427cf993..fdc81b09 100644 --- a/src/pykiso/test_result/templates/report_template.css +++ b/src/pykiso/test_result/templates/report_template.css @@ -61,6 +61,11 @@ body { text-transform: uppercase; } + summary { + text-decoration: underline; + cursor: pointer; + } + /* Media Queries*/ @media screen and (max-width: 600px) { diff --git a/src/pykiso/test_result/templates/report_template.html.j2 b/src/pykiso/test_result/templates/report_template.html.j2 index 85c45189..96680473 100644 --- a/src/pykiso/test_result/templates/report_template.html.j2 +++ b/src/pykiso/test_result/templates/report_template.html.j2 @@ -4,32 +4,31 @@ - {# For each TestClass #} + {# For each TestClass -#} {% for class_name, class_content in ALL_STEP_REPORT.items() -%}
- {# Add Title and description #} + {# Add Title and description -#}

ITF Test Report for: {{class_name}} - {% if class_content["succeed"] %} + {% if class_content["succeed"] -%} -> [Success] - {% else %} + {% else -%} -> [Fail] - {% endif %} + {%- endif %}

Test Description:

-

{{class_content["description"] | replace("\n", "
\n") }}

+

{{class_content["description"] | replace("\n", "
\n") }}

Date, Time, Software versions:

- {# Add additional information from header key #} + {# Add additional information from header key -#} {% for data in [class_content["time_result"], class_content["header"]] -%} {% for key, value in data.items() -%} {{key}}: {{value}}
{%- endfor %} {%- endfor %}

- - {# For each test (setUp, run, tearDown) #} - {% for test_name, test_content in class_content["test_list"].items() -%} + {#- For each test (setUp, run, tearDown) -#} + {% for test_name, test_content in class_content["test_list"].items() %} {% set test_success = is_test_success(test_content) -%}

{{test_name}} {% if test_success -%}Success @@ -38,7 +37,7 @@ - {# Set Columns name from the first row, excluding succeed flag #} + {# Set Columns name from the first row, excluding succeed flag -#} {% for column_name in test_content[0].keys() if column_name != "succeed" -%} @@ -46,7 +45,7 @@ - {# Loop over each assert method called (step-report) -#} + {#- Loop over each assert method called (step-report) -#} {% for row in test_content -%} {# Set cell color variable according to the assert result and emptiness -#} {% if row.pop("succeed") -%} @@ -56,13 +55,28 @@ {% set color_cell = "background-color: rgb(236, 160, 160);" -%} {% set color_empty_cell = "background-color: rgb(236, 160, 160);" -%} {%- endif %} + {# Needed for setting unique class per row in html in inner loop -#} + {% set row_index = loop.index -%} - {# Loop over each cell of the row -#} + {#- Loop over each cell of the row -#} {% for col_value in row.values() -%} - {# Set the value and apply colors to the cell -#} + {#- Set the value and apply colors to the cell #} {%- endfor %} @@ -79,3 +93,7 @@ + + diff --git a/src/pykiso/test_result/templates/report_template_script.js b/src/pykiso/test_result/templates/report_template_script.js new file mode 100644 index 00000000..40c20dff --- /dev/null +++ b/src/pykiso/test_result/templates/report_template_script.js @@ -0,0 +1,40 @@ +/** + * synchronize the toggling of all details elements by class name + * class name is unique for each test + row combination and gets rendered in by jinja + * @param {HTMLDetailsElement} element the details element being toggled + * @param {string} className class attribute of all details elements in the row + */ +function toggleDetailsInRow(element, className) { + element.scrollIntoView(); + const isOpen = element.hasAttribute("open"); + const detailsElements = document.getElementsByClassName(className); + for (details of detailsElements) { + details.open = isOpen; + isOpen ? setSummary("Click to minimize", details) : setTruncatedSummary(details); + } +} + +/* helper functions */ + +/** + * set the summary text of the details element + * @param {string} content new content of the summary element + * @param {HTMLDetailsElement} detailsElement parent of the summary being modified + */ +function setSummary(content, detailsElement) { + const summaryElement = detailsElement.getElementsByTagName("summary")[0]; + summaryElement.innerHTML = content; +} + +/** + * same result as jinja truncate when rendering to set summary after closing + * @param {HTMLDetailsElement} detailsElement parent of the summary being modified + * @param {number} length length to cut the content to, default is 50 + * @returns the truncated string + */ +function setTruncatedSummary(detailsElement, length = 50) { + const divElement = detailsElement.getElementsByTagName("div")[0]; + let shortened = divElement.textContent.trim().slice(0, length); + shortened = shortened + "..."; + setSummary(shortened, detailsElement); +} diff --git a/src/pykiso/test_result/text_result.py b/src/pykiso/test_result/text_result.py index c8d23f1d..a0192445 100644 --- a/src/pykiso/test_result/text_result.py +++ b/src/pykiso/test_result/text_result.py @@ -31,6 +31,7 @@ from shutil import get_terminal_size from typing import List, Optional, TextIO, Union from unittest import TextTestResult +from unittest.case import _SubTest from ..test_coordinator.test_case import BasicTest from ..test_coordinator.test_suite import BaseTestSuite @@ -241,6 +242,22 @@ def addError(self, test: Union[BasicTest, BaseTestSuite], err: ExcInfoType) -> N super().addError(test, err) self._error_occurred = True + def addSubTest( + self, + test: Union[BasicTest, BaseTestSuite], + subtest: _SubTest, + err: ExcInfoType, + ) -> None: + """Set the error flag when an error occurs in a subtest. + + :param test: running testcase + :param subtest: subtest runned + :param err: tuple returned by sys.exc_info + """ + super().addSubTest(test, subtest, err) + if err is not None: + self._error_occurred = True + def printErrorList(self, flavour: str, errors: List[tuple]): """Print all errors at the end of the whole tests execution. diff --git a/tests/conftest.py b/tests/conftest.py index 778f12c9..6d8ae01c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,19 +22,11 @@ from pykiso.lib.connectors import cc_example from pykiso.lib.connectors.cc_pcan_can import CCPCanCan from pykiso.lib.connectors.cc_vector_can import CCVectorCan -from pykiso.logging_initializer import ( - LogOptions, - add_logging_level, - get_logging_options, -) +from pykiso.logging_initializer import LogOptions, get_logging_options from pykiso.test_coordinator import test_case from pykiso.test_coordinator.test_case import define_test_parameters from pykiso.test_setup.dynamic_loader import DynamicImportLinker -# add the internal log levels to avoid errors during unit tests execution -add_logging_level("INTERNAL_WARNING", logging.WARNING + 1) -add_logging_level("INTERNAL_INFO", logging.INFO + 1) -add_logging_level("INTERNAL_DEBUG", logging.DEBUG + 1) ## skip slow test by default def pytest_addoption(parser): diff --git a/tests/test_auxiliary.py b/tests/test_auxiliary.py index 7578e266..ca8c733d 100644 --- a/tests/test_auxiliary.py +++ b/tests/test_auxiliary.py @@ -9,7 +9,6 @@ import logging import time -from multiprocessing.connection import deliver_challenge import pytest @@ -31,7 +30,7 @@ class MockAux(AuxiliaryInterface): def __init__(self, param_1=None, param_2=None, **kwargs): self.param_1 = param_1 self.param_2 = param_2 - super().__init__(**kwargs) + AuxiliaryInterface.__init__(self, **kwargs) _create_auxiliary_instance = mocker.stub(name="_create_auxiliary_instance") _delete_auxiliary_instance = mocker.stub(name="_delete_auxiliary_instance") @@ -56,7 +55,7 @@ class MockThreadAux(AuxiliaryInterface): def __init__(self, param_1=None, param_2=None, **kwargs): self.param_1 = param_1 self.param_2 = param_2 - super().__init__(**kwargs) + AuxiliaryInterface.__init__(self, **kwargs) _create_auxiliary_instance = mocker.stub(name="_create_auxiliary_instance") _delete_auxiliary_instance = mocker.stub(name="_delete_auxiliary_instance") @@ -76,7 +75,7 @@ def __init__(self, param_1=None, param_2=None, **kwargs): ) self.param_1 = param_1 self.param_2 = param_2 - super().__init__(name="mp_aux", **kwargs) + MpAuxiliaryInterface.__init__(self, name="mp_aux", **kwargs) _create_auxiliary_instance = mocker.stub(name="_create_auxiliary_instance") _delete_auxiliary_instance = mocker.stub(name="_delete_auxiliary_instance") @@ -91,7 +90,7 @@ def __init__(self, param_1=None, param_2=None, **kwargs): def mock_simple_aux(mocker): class MockSimpleAux(SimpleAuxiliaryInterface): def __init__(self, **kwargs): - super().__init__(**kwargs) + SimpleAuxiliaryInterface.__init__(self, **kwargs) _create_auxiliary_instance = mocker.stub(name="_create_auxiliary_instance") _delete_auxiliary_instance = mocker.stub(name="_delete_auxiliary_instance") diff --git a/tests/test_cc_fdx_lauterbach.py b/tests/test_cc_fdx_lauterbach.py index 5e8992c9..f6650aaf 100644 --- a/tests/test_cc_fdx_lauterbach.py +++ b/tests/test_cc_fdx_lauterbach.py @@ -449,7 +449,7 @@ def test_send_raw_bytes(): mock_t32_api.t32_Fdx_SendPoll = len(msg) lauterbach_inst.t32_api = mock_t32_api - poll_len = lauterbach_inst._cc_send(msg, raw=True) + poll_len = lauterbach_inst._cc_send(msg) assert poll_len == len(msg) @@ -476,7 +476,7 @@ def test_send_message(): mock_t32_api.t32_Fdx_SendPoll = expected_len lauterbach_inst.t32_api = mock_t32_api - poll_len = lauterbach_inst._cc_send(msg) + poll_len = lauterbach_inst._cc_send(msg.serialize()) assert poll_len == expected_len @@ -505,7 +505,7 @@ def test_send_message_exception(caplog, mock_msg): with caplog.at_level( logging.ERROR, ): - poll_len = lauterbach_inst._cc_send(msg, True) + poll_len = lauterbach_inst._cc_send(msg) assert poll_len == -3 assert ( f"ERROR occurred while sending {expected_len} bytes on {lauterbach_inst.fdxout}" diff --git a/tests/test_cc_mp_proxy.py b/tests/test_cc_mp_proxy.py index 1eb1aba4..5a6b74dc 100644 --- a/tests/test_cc_mp_proxy.py +++ b/tests/test_cc_mp_proxy.py @@ -9,6 +9,7 @@ import pytest +from pykiso import message from pykiso.lib.connectors.cc_mp_proxy import CCMpProxy, multiprocessing, queue @@ -18,6 +19,12 @@ def test_constructor(): assert isinstance(con_inst.queue_in, type(multiprocessing.Queue())) assert isinstance(con_inst.queue_out, type(multiprocessing.Queue())) + # pickling quick test + assert con_inst.__getstate__() == con_inst.__dict__ + new_dict = {**con_inst.__dict__, **{"some_attr": 12}} + con_inst.__setstate__(new_dict) + assert con_inst.__getstate__() == new_dict + def test_queue_reference(): con_inst = CCMpProxy() @@ -34,11 +41,10 @@ def test_queue_reference(): def test_cc_send(): with CCMpProxy() as proxy_inst: - proxy_inst._cc_send(b"\x12\x34\x56", raw=True, remote_id=0x500) + proxy_inst._cc_send(b"\x12\x34\x56", remote_id=0x500) arg, kwargs = proxy_inst.queue_in.get() assert arg[0] == b"\x12\x34\x56" - assert kwargs["raw"] == True assert kwargs["remote_id"] == 0x500 @@ -53,8 +59,9 @@ def test_cc_send(): def test_cc_receive(timeout, raw, raw_response): with CCMpProxy() as proxy_inst: proxy_inst.queue_out.put(raw_response) - resp = proxy_inst._cc_receive(timeout, raw) - + resp = proxy_inst._cc_receive(timeout) + if not raw and isinstance(resp["msg"], message.Message): + resp["msg"] = resp["msg"].parse_packet() assert resp["msg"] == raw_response["msg"] assert resp["remote_id"] == raw_response["remote_id"] diff --git a/tests/test_cc_pcan_can.py b/tests/test_cc_pcan_can.py index 66a41af0..30b2e7f6 100644 --- a/tests/test_cc_pcan_can.py +++ b/tests/test_cc_pcan_can.py @@ -451,20 +451,21 @@ def test_cc_close_with_exception( @pytest.mark.parametrize( - "parameters", + "parameters,raw", [ - {"msg": b"\x10\x36", "raw": True, "remote_id": 0x0A}, - {"msg": b"\x10\x36", "raw": True, "remote_id": None}, - {"msg": b"\x10\x36", "raw": True, "remote_id": 10}, - {"msg": b"", "raw": True, "remote_id": 10}, - {"msg": message_with_tlv, "raw": False, "remote_id": 0x0A}, - {"msg": message_with_no_tlv, "raw": False, "remote_id": 0x0A}, - {"msg": message_with_no_tlv}, - {"msg": message_with_no_tlv, "raw": False, "remote_id": 36}, + ({"msg": b"\x10\x36", "remote_id": 0x0A}, True), + ({"msg": b"\x10\x36", "remote_id": None}, True), + ({"msg": b"\x10\x36", "remote_id": 10}, True), + ({"msg": b"", "remote_id": 10}, True), + ({"msg": message_with_tlv, "remote_id": 0x0A}, False), + ({"msg": message_with_no_tlv, "remote_id": 0x0A}, False), + ({"msg": message_with_no_tlv}, False), + ({"msg": message_with_no_tlv, "remote_id": 36}, False), ], ) -def test_cc_send(mock_can_bus, parameters, mock_PCANBasic): - +def test_cc_send(mock_can_bus, parameters, raw, mock_PCANBasic): + if not raw: + parameters["msg"] = parameters.get("msg").serialize() with CCPCanCan(remote_id=0x0A) as can: can._cc_send(**parameters) @@ -473,41 +474,42 @@ def test_cc_send(mock_can_bus, parameters, mock_PCANBasic): @pytest.mark.parametrize( - "raw_data, can_id, cc_receive_param,expected_type", + "raw_data, can_id, timeout, raw,expected_type", [ - (b"\x40\x01\x03\x00\x01\x02\x03\x00", 0x500, (10, False), Message), + (b"\x40\x01\x03\x00\x01\x02\x03\x00", 0x500, 10, False, Message), ( b"\x40\x01\x03\x00\x01\x02\x03\x09\x6e\x02\x4f\x4b\x70\x03\x12\x34\x56", 0x207, - (None, None), + None, + None, Message, ), - (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, (10, True), bytearray), - (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, (0, True), bytearray), + (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, 10, True, bytearray), + (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, 0, True, bytearray), ], ) def test_can_recv( - mocker, mock_can_bus, raw_data, can_id, - cc_receive_param, + timeout, + raw, expected_type, mock_PCANBasic, ): - mock_bus_recv = mocker.patch( - "can.interface.Bus.recv", - return_value=python_can.Message(data=raw_data, arbitration_id=can_id), + mock_can_bus.Bus.recv.return_value = python_can.Message( + data=raw_data, arbitration_id=can_id ) + with CCPCanCan() as can: - response = can._cc_receive(*cc_receive_param) + response = can._cc_receive(timeout) msg_received = response.get("msg") - id_received = response.get("remote_id") - - assert isinstance(msg_received, expected_type) == True - assert id_received == can_id - mock_can_bus.Bus.recv.assert_called_once_with(timeout=cc_receive_param[0] or 1e-6) + if not raw: + msg_received = Message.parse_packet(msg_received) + assert isinstance(msg_received, expected_type) + assert response.get("remote_id") == can_id + mock_can_bus.Bus.recv.assert_called_once_with(timeout=timeout or 1e-6) mock_can_bus.Bus.shutdown.assert_called_once() @@ -523,7 +525,7 @@ def test_can_recv_invalid(mocker, mock_can_bus, raw_state, mock_PCANBasic): mocker.patch("can.interface.Bus.recv", return_value={"msg": None}) with CCPCanCan() as can: - response = can._cc_receive(timeout=0.0001, raw=raw_state) + response = can._cc_receive(timeout=0.0001) assert response["msg"] is None assert response.get("remote_id") is None diff --git a/tests/test_cc_process.py b/tests/test_cc_process.py index 14464fe8..34f32232 100644 --- a/tests/test_cc_process.py +++ b/tests/test_cc_process.py @@ -38,7 +38,7 @@ def test_process(mocker): # sleep 1s # print "hello" on stdout # print "pykiso" on stdout - 'import sys;import time;print(sys.stdin.readline().strip());sys.stdout.flush();time.sleep(1);print(\'error\', file=sys.stderr);sys.stderr.flush();time.sleep(1);print("hello");print("pykiso")', + 'import sys;import time;print(sys.stdin.readline().strip());sys.stdout.flush();time.sleep(0.1);print(\'error\', file=sys.stderr);sys.stderr.flush();time.sleep(0.1);print("hello");print("pykiso")', ], ) # Start the process @@ -48,15 +48,15 @@ def test_process(mocker): cc_process.start() # Receive nothing as process waits for input - assert cc_process.cc_receive(3) == {"msg": None} + assert cc_process.cc_receive(0.1) == {"msg": None} cc_process._cc_send("hi\r\n") - assert cc_process.cc_receive(3) == {"msg": {"stdout": "hi\n"}} - assert cc_process.cc_receive(3) == {"msg": {"stderr": "error\n"}} - assert cc_process.cc_receive(3) == {"msg": {"stdout": "hello\n"}} - assert cc_process.cc_receive(3) == {"msg": {"stdout": "pykiso\n"}} - assert cc_process.cc_receive(3) == {"msg": {"exit": 0}} + assert cc_process.cc_receive(1) == {"msg": {"stdout": "hi\n"}} + assert cc_process.cc_receive(1) == {"msg": {"stderr": "error\n"}} + assert cc_process.cc_receive(1) == {"msg": {"stdout": "hello\n"}} + assert cc_process.cc_receive(1) == {"msg": {"stdout": "pykiso\n"}} + assert cc_process.cc_receive(1) == {"msg": {"exit": 0}} cc_process._cc_close() - assert cc_process.cc_receive(3) == {"msg": None} + assert cc_process.cc_receive(0.1) == {"msg": None} def test_process_binary(mocker): @@ -82,7 +82,7 @@ def test_process_binary(mocker): # print "hello" on stdout # sleep 1s # print "pykiso" on stdout - 'import sys;import time;sys.stdout.write(sys.stdin.readline().strip());sys.stdout.flush();time.sleep(1);sys.stderr.write("error");sys.stderr.flush();time.sleep(1);sys.stdout.write("hello");sys.stdout.flush();time.sleep(1);sys.stdout.write("pykiso")', + 'import sys;import time;sys.stdout.write(sys.stdin.readline().strip());sys.stdout.flush();time.sleep(0.1);sys.stderr.write("error");sys.stderr.flush();time.sleep(0.1);sys.stdout.write("hello");sys.stdout.flush();time.sleep(0.1);sys.stdout.write("pykiso")', ], ) # Start the process @@ -92,13 +92,13 @@ def test_process_binary(mocker): cc_process._cc_send({"command": "start", "executable": "", "args": ""}) cc_process._cc_send(b"hi\n") - assert cc_process.cc_receive(3) == {"msg": {"stdout": b"hi"}} - assert cc_process.cc_receive(3) == {"msg": {"stderr": b"error"}} - assert cc_process.cc_receive(3) == {"msg": {"stdout": b"hello"}} - assert cc_process.cc_receive(3) == {"msg": {"stdout": b"pykiso"}} - assert cc_process.cc_receive(3) == {"msg": {"exit": 0}} + assert cc_process.cc_receive(1) == {"msg": {"stdout": b"hi"}} + assert cc_process.cc_receive(1) == {"msg": {"stderr": b"error"}} + assert cc_process.cc_receive(1) == {"msg": {"stdout": b"hello"}} + assert cc_process.cc_receive(1) == {"msg": {"stdout": b"pykiso"}} + assert cc_process.cc_receive(1) == {"msg": {"exit": 0}} cc_process._cc_close() - assert cc_process.cc_receive(3) == {"msg": None} + assert cc_process.cc_receive(0.1) == {"msg": None} def test_send_without_pipe_exception(mocker): diff --git a/tests/test_cc_proxy.py b/tests/test_cc_proxy.py index 29a6daef..0da01216 100644 --- a/tests/test_cc_proxy.py +++ b/tests/test_cc_proxy.py @@ -47,18 +47,17 @@ def tx_func(con, *args, **kwargs): @pytest.mark.parametrize( - "timeout, raw, raw_response", + "timeout, raw_response", [ - (0.200, True, {"msg": b"\x12\x34\x56", "remote_id": None}), - (0, False, {"msg": b"\x12", "remote_id": 0x500}), - (None, None, {"msg": None, "remote_id": None}), + (0.200, {"msg": b"\x12\x34\x56", "remote_id": None}), + (0, {"msg": b"\x12", "remote_id": 0x500}), + (None, {"msg": None, "remote_id": None}), ], ) -def test_cc_receive(timeout, raw, raw_response): +def test_cc_receive(timeout, raw_response): with CCProxy() as proxy_inst: proxy_inst.queue_out.put(raw_response) - resp = proxy_inst._cc_receive(timeout, raw) - + resp = proxy_inst._cc_receive(timeout) assert resp["msg"] == raw_response["msg"] assert resp["remote_id"] == raw_response["remote_id"] diff --git a/tests/test_cc_rtt_segger.py b/tests/test_cc_rtt_segger.py index 2aabfa23..7f036f65 100644 --- a/tests/test_cc_rtt_segger.py +++ b/tests/test_cc_rtt_segger.py @@ -262,18 +262,22 @@ def test_rtt_segger_rtt_logger_running( ], ) def test_rtt_segger_send(mock_pylink_square_socket, msg_to_send, raw_state): + if not raw_state: + msg_to_send = msg_to_send.serialize() with CCRttSegger() as cc_rtt_inst: - cc_rtt_inst._cc_send(msg=msg_to_send, raw=raw_state) + cc_rtt_inst._cc_send(msg=msg_to_send) mock_pylink_square_socket.JLink.rtt_write.assert_called_once() -def test_rtt_segger_send_error(mock_pylink_square_socket, caplog): +def test_rtt_segger_send_error(mock_pylink_square_socket, mocker, caplog): + with CCRttSegger() as cc_rtt_inst: + mocker.patch.object(cc_rtt_inst.jlink, "rtt_write", side_effect=Exception) with caplog.at_level( logging.ERROR, ): - cc_rtt_inst._cc_send(msg=[0], raw=False) + cc_rtt_inst._cc_send(msg=[0]) assert ( f"ERROR occurred while sending {len([0])} bytes on buffer {cc_rtt_inst.tx_buffer_idx}" in caplog.text @@ -292,19 +296,21 @@ def test_rtt_segger_send_error(mock_pylink_square_socket, caplog): def test_rtt_segger_receive( mocker, mock_pylink_square_socket, timeout, raw, expected_return ): - + size = Message().header_size if not raw else None with CCRttSegger() as cc_rtt_inst: - response = cc_rtt_inst._cc_receive(timeout=timeout, raw=raw) - + response = cc_rtt_inst._cc_receive(timeout=timeout, size=size) assert isinstance(response, dict) - assert isinstance(response["msg"], expected_return) + assert isinstance( + response["msg"] if raw else Message.parse_packet(response["msg"]), + expected_return, + ) def test_rtt_segger_timeout(mocker, mock_pylink_square_socket): mocker.patch("pylink.JLink.rtt_read", return_value=[]) with CCRttSegger() as cc_rtt_inst: - response = cc_rtt_inst._cc_receive(timeout=0.010, raw=True) + response = cc_rtt_inst._cc_receive(timeout=0.010) assert response["msg"] == None diff --git a/tests/test_cc_serial.py b/tests/test_cc_serial.py index cc0c4ce4..71f8697c 100644 --- a/tests/test_cc_serial.py +++ b/tests/test_cc_serial.py @@ -29,16 +29,6 @@ def test_open_default(): assert cc_serial.serial.is_open is False -def test_receive_raw(mocker): - - serial_mock = mocker.patch("serial.Serial") - - cc_serial = CCSerial("com666") - - with pytest.raises(NotImplementedError): - cc_serial.cc_receive(raw=False) - - def test_receive_one_char(mocker): serial_mock = mocker.patch("serial.Serial") @@ -47,7 +37,7 @@ def test_receive_one_char(mocker): cc_serial.serial.read.return_value = b"s" cc_serial.serial.in_waiting = 0 - recv = cc_serial.cc_receive(raw=True, timeout=0.5) + recv = cc_serial.cc_receive(timeout=0.5) assert recv.get("msg") == b"s" cc_serial.serial.read.assert_called_once() assert cc_serial.serial.timeout == 0 @@ -61,7 +51,7 @@ def test_receive_multiple_bytes(mocker): cc_serial.serial.read.side_effect = [b"1", b"234"] cc_serial.serial.in_waiting = 3 - recv = cc_serial.cc_receive(raw=True, timeout=0.5) + recv = cc_serial.cc_receive(timeout=0.5) assert recv.get("msg") == b"1234" assert cc_serial.serial.read.call_count == 2 diff --git a/tests/test_cc_socket_can.py b/tests/test_cc_socket_can.py index e6cfd427..ebfbc4ae 100644 --- a/tests/test_cc_socket_can.py +++ b/tests/test_cc_socket_can.py @@ -191,54 +191,76 @@ def test_cc_close(logging_requested, mock_can_bus, mocker): @pytest.mark.parametrize( - "parameters", + "message,remote_id", [ - {"msg": b"\x10\x36", "remote_id": 0x0A, "raw": True}, - {"msg": b"\x10\x36", "remote_id": None, "raw": True}, - {"msg": b"\x10\x36", "remote_id": 10, "raw": True}, - {"msg": b"", "remote_id": 10, "raw": True}, - {"msg": message_with_tlv, "remote_id": 0x0A, "raw": False}, - {"msg": message_with_no_tlv, "remote_id": 0x0A, "raw": False}, - {"msg": message_with_no_tlv}, - {"msg": message_with_no_tlv, "remote_id": 36, "raw": False}, + ( + b"\x10\x36", + 0x0A, + ), + ( + b"\x10\x36", + None, + ), + ( + b"\x10\x36", + 10, + ), + ( + b"", + 10, + ), + ( + message_with_tlv, + 0x0A, + ), + ( + message_with_no_tlv, + 0x0A, + ), + (message_with_no_tlv, None), + (message_with_no_tlv, 36), ], ) -def test_cc_send(mock_can_bus, parameters): - +def test_cc_send(mock_can_bus, message, remote_id): + if isinstance(message, Message): + message = message.serialize() with CCSocketCan(remote_id=0x0A) as can: - can._cc_send(**parameters) + can._cc_send(message, remote_id=remote_id) mock_can_bus.Bus.send.assert_called_once() mock_can_bus.Bus.shutdown.assert_called_once() @pytest.mark.parametrize( - "raw_data, can_id, cc_receive_param, expected_type", + "raw_data, can_id, timeout , raw ,expected_type", [ - (b"\x40\x01\x03\x00\x01\x02\x03\x00", 0x500, (10, False), Message), + (b"\x40\x01\x03\x00\x01\x02\x03\x00", 0x500, 10, False, Message), ( b"\x40\x01\x03\x00\x01\x02\x03\x09\x6e\x02\x4f\x4b\x70\x03\x12\x34\x56", 0x207, - (None, None), + None, + None, Message, ), - (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, (10, True), bytearray), - (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, (0, True), bytearray), + (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, 10, True, bytearray), + (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, 0, True, bytearray), ], ) -def test_can_recv( - mocker, mock_can_bus, raw_data, can_id, cc_receive_param, expected_type -): +def test_can_recv(mocker, mock_can_bus, raw_data, can_id, timeout, raw, expected_type): mock_bus_recv = mocker.patch( "can.interface.Bus.recv", return_value=python_can.Message(data=raw_data, arbitration_id=can_id), ) with CCSocketCan() as can: - recv_msg = can._cc_receive(*cc_receive_param) - - assert isinstance(recv_msg["msg"], expected_type) - assert recv_msg["remote_id"] == can_id - mock_can_bus.Bus.recv.assert_called_once_with(timeout=cc_receive_param[0] or 1e-6) + response = can._cc_receive(timeout) + msg_received = response.get("msg") + id_received = response.get("remote_id") + + if not raw and msg_received is not None: + msg_received = Message.parse_packet(msg_received) + assert isinstance(msg_received, expected_type) + assert id_received == can_id + mock_can_bus.Bus.recv.assert_called_once_with(timeout=timeout or 1e-6) mock_can_bus.Bus.shutdown.assert_called_once() @@ -254,10 +276,15 @@ def test_can_recv_invalid(mocker, mock_can_bus, raw_state): mocker.patch("can.interface.Bus.recv", return_value=None) with CCSocketCan() as can: - recv_msg = can._cc_receive(timeout=0.0001, raw=raw_state) - - assert recv_msg["msg"] is None - assert recv_msg.get("remote_id") is None + response = can._cc_receive( + timeout=0.0001, + ) + msg_received = response.get("msg") + id_received = response.get("remote_id") + if not raw_state and msg_received is not None: + msg_received = msg_received.parse_packet() + assert msg_received is None + assert id_received is None def test_can_recv_exception(mocker, mock_can_bus): diff --git a/tests/test_cc_tcp_ip.py b/tests/test_cc_tcp_ip.py index bb21f4b9..5c690151 100644 --- a/tests/test_cc_tcp_ip.py +++ b/tests/test_cc_tcp_ip.py @@ -115,8 +115,9 @@ def test__cc_send( """Test _cc_send""" param = constructor_params.values() socket_connector = cc_tcp_ip.CCTcpip(*param) - - socket_connector._cc_send(msg_to_send, is_raw) + if not is_raw: + msg_to_send = msg_to_send.encode() + socket_connector._cc_send(msg_to_send) mock_socket.socket.socket.send.assert_called_once_with(expected_sent_message) @@ -135,10 +136,10 @@ def test__cc_receive( """Test _cc_receive""" param = constructor_params.values() socket_connector = cc_tcp_ip.CCTcpip(*param) - - assert expected_response == socket_connector._cc_receive( - timeout=timeout, raw=is_raw - ) + response = socket_connector._cc_receive(timeout=timeout) + if not is_raw: + response["msg"] = response.get("msg").decode().strip() + assert expected_response == response socket_connector.socket.settimeout.assert_called_once_with(timeout or 1e-6) diff --git a/tests/test_cc_udp.py b/tests/test_cc_udp.py index f46d5bae..028bfa89 100644 --- a/tests/test_cc_udp.py +++ b/tests/test_cc_udp.py @@ -94,21 +94,21 @@ def test_udp_close(mock_udp_socket): @pytest.mark.parametrize( - "msg_to_send, raw_state", + "msg_to_send", [ - (message_with_tlv, False), - (message_with_no_tlv, False), - (b"\x40\x01\x03\x00\x01\x02\x03\x00", True), + (message_with_tlv), + (message_with_no_tlv), + (b"\x40\x01\x03\x00\x01\x02\x03\x00"), ], ) -def test_udp_send_valid(mock_udp_socket, msg_to_send, raw_state): +def test_udp_send_valid(mock_udp_socket, msg_to_send): """Test message _cc_send method using context manager from Connector class. Validation criteria: - sendto is call once """ with CCUdp("120.0.0.7", 5005) as udp_inst: - udp_inst._cc_send(msg_to_send, raw=raw_state) + udp_inst._cc_send(msg_to_send) mock_udp_socket.socket.sendto.assert_called_once() @@ -145,18 +145,9 @@ def test_udp_recv_invalid(mocker, mock_udp_socket, expected_exception, caplog): @pytest.mark.parametrize( "raw_data, cc_receive_param, expected_type", [ - ((b"\x40\x01\x03\x00\x01\x02\x03\x00", 1002), (10, False), Message), - ( - ( - b"\x40\x01\x03\x00\x01\x02\x03\x09\x6e\x02\x4f\x4b\x70\x03\x12\x34\x56", - 4000, - ), - (None, None), - Message, - ), - ((b"\x40\x01\x03\x00\x01\x02\x03\x00", 36), (10, True), bytes), - ((b"\x40\x01\x03\x00\x01\x02\x03\x00", 36), (0, True), bytes), - ((b"\x40\x01\x03\x00\x01\x02\x03\x00", 36), (None, True), bytes), + ((b"\x40\x01\x03\x00\x01\x02\x03\x00", 36), (10), bytes), + ((b"\x40\x01\x03\x00\x01\x02\x03\x00", 36), (0), bytes), + ((b"\x40\x01\x03\x00\x01\x02\x03\x00", 36), (None), bytes), ], ) def test_udp_recv_valid( @@ -172,12 +163,10 @@ def test_udp_recv_valid( mocker.patch("socket.socket.recvfrom", return_value=raw_data) with CCUdp("120.0.0.7", 5005) as udp_inst: - msg_received = udp_inst._cc_receive(*cc_receive_param) + msg_received = udp_inst._cc_receive(cc_receive_param) assert isinstance(msg_received, dict) assert isinstance(msg_received["msg"], expected_type) assert udp_inst.source_addr == raw_data[1] - mock_udp_socket.socket.settimeout.assert_called_once_with( - cc_receive_param[0] or 1e-6 - ) + mock_udp_socket.socket.settimeout.assert_called_once_with(cc_receive_param or 1e-6) mock_udp_socket.socket.recvfrom.assert_called_once() diff --git a/tests/test_cc_udp_server.py b/tests/test_cc_udp_server.py index c8295d88..6c96493b 100644 --- a/tests/test_cc_udp_server.py +++ b/tests/test_cc_udp_server.py @@ -102,30 +102,31 @@ def test_udp_server_send(mock_udp_socket, msg_to_send): - sendto is call once """ with CCUdpServer("120.0.0.7", 5005) as udp_server: - udp_server._cc_send(msg_to_send) + udp_server._cc_send(msg_to_send.serialize()) mock_udp_socket.socket.sendto.assert_called_once() @pytest.mark.parametrize( - "raw_data, cc_receive_param, expected_type", + "raw_data, timeout, raw, expected_type", [ - ((b"\x40\x01\x03\x00\x01\x02\x03\x00", 5050), (10, False), Message), + ((b"\x40\x01\x03\x00\x01\x02\x03\x00", 5050), (10), (False), Message), ( ( b"\x40\x01\x03\x00\x01\x02\x03\x09\x6e\x02\x4f\x4b\x70\x03\x12\x34\x56", 2002, ), - (None, None), + (None), + (None), Message, ), - ((b"\x40\x01\x03\x00\x02\x03\x00", 5050), (10, True), bytes), - ((b"\x40\x01\x03\x00\x02\x03\x00", 5050), (0, True), bytes), - ((b"\x40\x01\x03\x00\x02\x03\x00", 5050), (None, True), bytes), + ((b"\x40\x01\x03\x00\x02\x03\x00", 5050), (10), (True), bytes), + ((b"\x40\x01\x03\x00\x02\x03\x00", 5050), (0), (True), bytes), + ((b"\x40\x01\x03\x00\x02\x03\x00", 5050), (None), (True), bytes), ], ) def test_udp_server_recv_valid( - mocker, mock_udp_socket, raw_data, cc_receive_param, expected_type + mocker, mock_udp_socket, raw_data, timeout, raw, expected_type ): """Test message _cc_receive method using context manager from Connector class. @@ -137,13 +138,14 @@ def test_udp_server_recv_valid( mocker.patch("socket.socket.recvfrom", return_value=(raw_data[0], raw_data[1])) with CCUdpServer("120.0.0.7", 5005) as udp_server: - msg_received = udp_server._cc_receive(*cc_receive_param) + msg_received = udp_server._cc_receive(timeout) assert isinstance(msg_received, dict) - assert isinstance(msg_received["msg"], expected_type) - mock_udp_socket.socket.settimeout.assert_called_once_with( - cc_receive_param[0] or 1e-6 - ) + msg_received = msg_received.get("msg") + if not raw: + msg_received = Message.parse_packet(msg_received) + assert isinstance(msg_received, expected_type) + mock_udp_socket.socket.settimeout.assert_called_once_with(timeout or 1e-6) mock_udp_socket.socket.recvfrom.assert_called_once() diff --git a/tests/test_cc_vector_can.py b/tests/test_cc_vector_can.py index d0f3f706..0f10e7f0 100644 --- a/tests/test_cc_vector_can.py +++ b/tests/test_cc_vector_can.py @@ -158,20 +158,21 @@ def test_cc_close(mock_can_bus): @pytest.mark.parametrize( - "parameters", + "parameters ,raw", [ - {"msg": b"\x10\x36", "raw": True, "remote_id": 0x0A}, - {"msg": b"\x10\x36", "raw": True, "remote_id": None}, - {"msg": b"\x10\x36", "raw": True, "remote_id": 10}, - {"msg": b"", "raw": True, "remote_id": 10}, - {"msg": message_with_tlv, "raw": False, "remote_id": 0x0A}, - {"msg": message_with_no_tlv, "raw": False, "remote_id": 0x0A}, - {"msg": message_with_no_tlv}, - {"msg": message_with_no_tlv, "raw": False, "remote_id": 36}, + ({"msg": b"\x10\x36", "remote_id": 0x0A}, True), + ({"msg": b"\x10\x36", "remote_id": None}, True), + ({"msg": b"\x10\x36", "remote_id": 10}, True), + ({"msg": b"", "remote_id": 10}, True), + ({"msg": message_with_tlv, "remote_id": 0x0A}, False), + ({"msg": message_with_no_tlv, "remote_id": 0x0A}, False), + ({"msg": message_with_no_tlv}, False), + ({"msg": message_with_no_tlv, "remote_id": 36}, False), ], ) -def test_cc_send(mock_can_bus, parameters): - +def test_cc_send(mock_can_bus, parameters, raw): + if not raw: + parameters["msg"] = parameters.get("msg").serialize() with CCVectorCan() as can: can.remote_id = 0x500 can._cc_send(**parameters) @@ -181,47 +182,48 @@ def test_cc_send(mock_can_bus, parameters): @pytest.mark.parametrize( - "raw_data, can_id, cc_receive_param,expected_type", + "raw_data, can_id, timeout , raw,expected_type", [ - (b"\x40\x01\x03\x00\x01\x02\x03\x00", 0x500, (10, False), Message), + (b"\x40\x01\x03\x00\x01\x02\x03\x00", 0x500, 10, False, Message), ( b"\x40\x01\x03\x00\x01\x02\x03\x09\x6e\x02\x4f\x4b\x70\x03\x12\x34\x56", 0x207, - (None, None), + None, + None, Message, ), - (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, (10, True), bytearray), - (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, (0, True), bytearray), + (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, 10, True, bytearray), + (b"\x40\x01\x03\x00\x02\x03\x00", 0x502, 0, True, bytearray), ], ) -def test_can_recv( - mocker, mock_can_bus, raw_data, can_id, cc_receive_param, expected_type -): +def test_can_recv(mocker, mock_can_bus, raw_data, can_id, timeout, raw, expected_type): mocker.patch( "can.interface.Bus.recv", return_value=python_can.Message(data=raw_data, arbitration_id=can_id), ) with CCVectorCan() as can: - response = can._cc_receive(*cc_receive_param) + response = can._cc_receive(timeout) msg_received = response.get("msg") id_received = response.get("remote_id") + if not raw: + msg_received = Message.parse_packet(msg_received) assert isinstance(msg_received, expected_type) == True assert id_received == can_id - mock_can_bus.Bus.recv.assert_called_once_with(timeout=cc_receive_param[0] or 1e-6) + mock_can_bus.Bus.recv.assert_called_once_with(timeout=timeout or 1e-6) mock_can_bus.Bus.shutdown.assert_called_once() @pytest.mark.parametrize( - "raw_state, side_effect_value, expected_log", + " side_effect_value, expected_log", [ - (True, None, ""), - (False, BaseException, "encountered error while receiving message via"), + (None, ""), + (BaseException, "encountered error while receiving message via"), ], ) def test_can_recv_invalid( - mocker, mock_can_bus, raw_state, side_effect_value, caplog, expected_log + mocker, mock_can_bus, side_effect_value, caplog, expected_log ): mocker.patch( @@ -230,7 +232,7 @@ def test_can_recv_invalid( with caplog.at_level(logging.ERROR): with CCVectorCan() as can: - response = can._cc_receive(timeout=0.0001, raw=raw_state) + response = can._cc_receive(timeout=0.0001) assert response["msg"] is None assert response.get("remote_id") is None diff --git a/tests/test_cc_visa.py b/tests/test_cc_visa.py index d3d692fe..5791665a 100644 --- a/tests/test_cc_visa.py +++ b/tests/test_cc_visa.py @@ -12,6 +12,7 @@ import pyvisa from pykiso.lib.connectors import cc_visa +from pykiso.message import Message constructor_params_serial = {"serial_port": 4, "baud_rate": 9600} constructor_params_tcpip = { @@ -109,17 +110,14 @@ def test_cc_close(constructor_params_serial): assert visa_inst._cc_close() is None -@pytest.mark.parametrize( - "constructor_params_serial, raw_value, expected_return", - [ - (constructor_params_serial, True, {"msg": b"read response"}), - (constructor_params_serial, False, {"msg": "read response"}), - ], -) -def test__cc_receive(mocker, constructor_params_serial, raw_value, expected_return): +def test_cc_receive( + mocker, +): visa_inst = cc_visa.VISASerial(constructor_params_serial) mocker.patch.object(visa_inst, "resource", new=MockSerialInstrument(serial_port=3)) - assert visa_inst._cc_receive(raw=raw_value) == expected_return + message_received = visa_inst._cc_receive() + + assert message_received == {"msg": b"read response"} @pytest.mark.parametrize( @@ -175,10 +173,10 @@ def test__cc_send(mocker, constructor_params_serial, scpi_cmds): mocker.patch.object(visa_inst, "resource", new=MockSerialInstrument(serial_port=3)) # Test with string input - assert visa_inst._cc_send(scpi_cmds["example"]) is None + assert visa_inst._cc_send(scpi_cmds["example"].encode()) is None # Test with bytes input - assert visa_inst._cc_send(b"example", raw=True) is None + assert visa_inst._cc_send(b"example") is None @pytest.mark.parametrize( diff --git a/tests/test_com_aux.py b/tests/test_com_aux.py index 2a41a34b..b4bc327e 100644 --- a/tests/test_com_aux.py +++ b/tests/test_com_aux.py @@ -81,7 +81,7 @@ def test_com_aux_send_message_without_contextmanager(mocker, caplog, com_aux_ins com_aux_inst.send_message(msg) - assert channel_cc_send_mock.call_args[1] == {"msg": b"test", "raw": True} + assert channel_cc_send_mock.call_args[1] == {"msg": b"test"} com_aux_inst.delete_instance() diff --git a/tests/test_connector_interface.py b/tests/test_connector_interface.py index 5bb846b6..6e7f1ac1 100644 --- a/tests/test_connector_interface.py +++ b/tests/test_connector_interface.py @@ -7,6 +7,7 @@ # SPDX-License-Identifier: EPL-2.0 ########################################################################## +import logging import multiprocessing import threading @@ -84,14 +85,39 @@ def test_channel_cc_send(channel_obj): cc_inst = channel_obj(name="thread-channel") cc_inst.cc_send(msg=b"\x01\x02\x03") - cc_inst._cc_send.assert_called_with(msg=b"\x01\x02\x03", raw=False) + cc_inst._cc_send.assert_called_with(msg=b"\x01\x02\x03") + + +def test_channel_cc_send_raw(channel_obj, caplog): + cc_inst = channel_obj(name="thread-channel") + + with caplog.at_level(logging.WARNING): + cc_inst.cc_send(msg=b"\x01\x02\x03", raw=True) + assert ( + "Use of 'raw' keyword argument is deprecated. It won't be passed to '_cc_send'." + in caplog.text + ) + + cc_inst._cc_send.assert_called_with(msg=b"\x01\x02\x03", raw=True) def test_channel_cc_receive(channel_obj): cc_inst = channel_obj(name="thread-channel") cc_inst.cc_receive() - cc_inst._cc_receive.assert_called_with(timeout=0.1, raw=False) + cc_inst._cc_receive.assert_called_with(timeout=0.1) + + +def test_channel_cc_receive_raw(channel_obj, caplog): + cc_inst = channel_obj(name="thread-channel") + + with caplog.at_level(logging.WARNING): + cc_inst.cc_receive(raw=True) + assert ( + "Use of 'raw' keyword argument is deprecated. It won't be passed to '_cc_receive'." + in caplog.text + ) + cc_inst._cc_receive.assert_called_with(timeout=0.1, raw=True) def test_channel_invalid_interface(): diff --git a/tests/test_dut_auxiliary.py b/tests/test_dut_auxiliary.py index 96351212..a73eaa71 100644 --- a/tests/test_dut_auxiliary.py +++ b/tests/test_dut_auxiliary.py @@ -9,6 +9,7 @@ import logging import threading +from typing import Optional import pytest @@ -47,9 +48,7 @@ def test_create_auxiliary_instance(aux_inst): def test_create_auxiliary_instance_flasher_fail(mocker, aux_inst): mocker.patch.object(aux_inst.flash, "open", side_effect=AssertionError) - state = aux_inst._create_auxiliary_instance() - assert state is False @@ -324,10 +323,10 @@ def test_wait_and_get_report_queue_empty(aux_inst): def test__run_command(mocker, aux_inst): send_mock = mocker.patch.object(aux_inst.channel, "_cc_send") + message_send = message.Message() + aux_inst._run_command(cmd_message=message_send, cmd_data=None) - aux_inst._run_command(cmd_message="abcde", cmd_data=None) - - send_mock.assert_called_with(msg="abcde", raw=False) + send_mock.assert_called_with(msg=message_send.serialize()) def test__run_command_exception(mocker, aux_inst, caplog): @@ -344,9 +343,11 @@ def test__receive_message(mocker, aux_inst): response = message.Message(MESSAGE_TYPE.LOG, COMMAND_TYPE.TEST_SUITE_RUN) send_mock = mocker.patch.object(aux_inst.channel, "_cc_send") recv_mock = mocker.patch.object( - aux_inst.channel, "_cc_receive", return_value={"msg": response} + aux_inst.channel, "_cc_receive", return_value={"msg": response.serialize()} + ) + parse_mock = mocker.patch.object( + message.Message, "parse_packet", return_value=response ) - aux_inst._receive_message(timeout_in_s=0) send_mock.assert_called_once() @@ -374,9 +375,11 @@ def test__receive_message_failed_ack(mocker, aux_inst): aux_inst.channel, "_cc_send", side_effect=AttributeError ) recv_mock = mocker.patch.object( - aux_inst.channel, "_cc_receive", return_value={"msg": response} + aux_inst.channel, "_cc_receive", return_value={"msg": response.serialize()} + ) + parse_mock = mocker.patch.object( + message.Message, "parse_packet", return_value=response ) - aux_inst._receive_message(timeout_in_s=0) recv_mock.assert_called_once() @@ -387,9 +390,11 @@ def test__receive_message_response_is_ack(mocker, aux_inst): response = message.Message(MESSAGE_TYPE.ACK) send_mock = mocker.patch.object(aux_inst.channel, "_cc_send") recv_mock = mocker.patch.object( - aux_inst.channel, "_cc_receive", return_value={"msg": response} + aux_inst.channel, "_cc_receive", return_value={"msg": response.serialize()} + ) + parse_mock = mocker.patch.object( + message.Message, "parse_packet", return_value=response ) - aux_inst._receive_message(timeout_in_s=0) send_mock.assert_not_called() diff --git a/tests/test_instrument_control_auxiliary.py b/tests/test_instrument_control_auxiliary.py index 36f9f8ca..1c39370d 100644 --- a/tests/test_instrument_control_auxiliary.py +++ b/tests/test_instrument_control_auxiliary.py @@ -114,7 +114,7 @@ def test_handle_write_without_validation(aux_inst, cchannel_inst): req = "SYST:LOCK ON" response = aux_inst.handle_write(req) - cchannel_inst._cc_send.assert_called_with(msg="SYST:LOCK ON\n", raw=False) + cchannel_inst._cc_send.assert_called_with(msg="SYST:LOCK ON\n") assert response == "NO_VALIDATION" @@ -129,7 +129,7 @@ def test_handle_write_with_validation(mocker, aux_inst, cchannel_inst): response = aux_inst.handle_write(req, validation) - cchannel_inst._cc_send.assert_called_with(msg="SYST:LOCK ON\n", raw=False) + cchannel_inst._cc_send.assert_called_with(msg="SYST:LOCK ON\n") handle_query_mock.assert_called_with(validation[0]) assert response == "SUCCESS" @@ -194,7 +194,7 @@ def test_handle_read(aux_inst, cchannel_inst): aux_inst.handle_read() - cchannel_inst._cc_receive.assert_called_with(timeout=0.1, raw=False) + cchannel_inst._cc_receive.assert_called_with(timeout=0.1) def test_handle_query(mocker, aux_inst, cchannel_inst): @@ -203,9 +203,9 @@ def test_handle_query(mocker, aux_inst, cchannel_inst): aux_inst.handle_query(query) cchannel_inst._cc_send.assert_called_with( - msg=f"{query}{aux_inst.write_termination}", raw=False + msg=f"{query}{aux_inst.write_termination}", ) - cchannel_inst._cc_receive.assert_called_with(timeout=0.1, raw=False) + cchannel_inst._cc_receive.assert_called_with(timeout=0.1) def test_handle_query_with_visa_cc(mocker, aux_inst, cc_visa_inst): diff --git a/tests/test_mp_proxy_auxiliary.py b/tests/test_mp_proxy_auxiliary.py index b4d63b21..a05cd4cd 100644 --- a/tests/test_mp_proxy_auxiliary.py +++ b/tests/test_mp_proxy_auxiliary.py @@ -6,6 +6,7 @@ # # SPDX-License-Identifier: EPL-2.0 ########################################################################## +import importlib import logging import queue import sys @@ -14,6 +15,7 @@ import pytest +import pykiso from pykiso.connector import CChannel from pykiso.interfaces.thread_auxiliary import AuxiliaryInterface from pykiso.lib.auxiliaries.mp_proxy_auxiliary import ( @@ -21,16 +23,22 @@ MpProxyAuxiliary, TraceOptions, ) +from pykiso.lib.connectors.cc_mp_proxy import CCMpProxy +from pykiso.lib.connectors.cc_proxy import CCProxy + +pykiso.logging_initializer.log_options = pykiso.logging_initializer.LogOptions( + ".", "INFO", "TEXT", False +) @pytest.fixture def mock_auxiliaries(mocker): - class MockProxyCChannel(CChannel): + class MockProxyCChannel(CCMpProxy): def __init__(self, name=None, *args, **kwargs): + super(MockProxyCChannel, self).__init__(*args, **kwargs) self.name = name - self.queue_in = queue.Queue() self.queue_out = queue.Queue() - super(MockProxyCChannel, self).__init__(*args, **kwargs) + self.queue_in = queue.Queue() _cc_open = mocker.stub(name="_cc_open") open = mocker.stub(name="open") @@ -161,7 +169,9 @@ def test_init_trace_not_activated(mocker, mp_proxy_auxiliary_inst): def test_get_proxy_con_pre_load(mocker, mp_proxy_auxiliary_inst, caplog): - mock_check_comp = mocker.patch.object(MpProxyAuxiliary, "_check_compatibility") + mock_check_comp = mocker.patch.object( + MpProxyAuxiliary, "_check_channels_compatibility" + ) mock_get_alias = mocker.patch.object( ConfigRegistry, "get_auxes_alias", return_value="later_aux" @@ -172,9 +182,13 @@ class Linker: def __init__(self): self._aux_cache = AuxCache() + class FakeCCMpProxy(CCMpProxy): + def _bind_channel_info(self, *args, **kwargs): + pass + class FakeAux: def __init__(self): - self.channel = True + self.channel = FakeCCMpProxy() self.is_proxy_capable = True class AuxCache: @@ -183,33 +197,64 @@ def get_instance(self, aux_name): ConfigRegistry._linker = Linker() - with caplog.at_level( - logging.WARNING, - ): - + with caplog.at_level(logging.WARNING): result_get_proxy = mp_proxy_auxiliary_inst.get_proxy_con(["later_aux"]) + assert ( "Auxiliary : later_aux is not using import magic mechanism (pre-loaded)" in caplog.text ) assert len(result_get_proxy) == 1 - assert isinstance(result_get_proxy[0], bool) + assert isinstance(result_get_proxy[0], FakeCCMpProxy) mock_check_comp.assert_called() mock_get_alias.get_called() -def test_get_proxy_con_valid_(mocker, mp_proxy_auxiliary_inst, mock_auxiliaries): +def test_get_proxy_con_valid(mocker, mp_proxy_auxiliary_inst, mock_auxiliaries): mock_check_comp = mocker.patch.object(MpProxyAuxiliary, "_check_compatibility") + mock_check_channel_comp = mocker.patch.object( + MpProxyAuxiliary, "_check_channels_compatibility" + ) AUX_LIST_NAMES = ["MockAux1", "MockAux2"] result_get_proxy = mp_proxy_auxiliary_inst.get_proxy_con(AUX_LIST_NAMES) - assert len(result_get_proxy) == 2 + assert len(result_get_proxy) == len(AUX_LIST_NAMES) assert all(isinstance(items, CChannel) for items in result_get_proxy) - mock_check_comp.assert_called() + mock_check_channel_comp.assert_called_once() + assert mock_check_comp.call_count == len(AUX_LIST_NAMES) + + +def test_get_proxy_con_invalid_cchannel(mocker, caplog, mp_proxy_auxiliary_inst): + mock_get_alias = mocker.patch.object( + ConfigRegistry, "get_auxes_alias", return_value="later_aux" + ) + mp_proxy_auxiliary_inst.aux_list = ["later_aux"] + + class Linker: + def __init__(self): + self._aux_cache = AuxCache() + + class OtherCChannel(CChannel): + def _bind_channel_info(self, *args, **kwargs): + pass + class FakeAux: + def __init__(self): + self.channel = OtherCChannel() + self.is_proxy_capable = True + + class AuxCache: + def get_instance(self, aux_name): + return FakeAux() -def test_get_proxy_con_invalid_(mocker, caplog, mp_proxy_auxiliary_inst): + ConfigRegistry._linker = Linker() + + with pytest.raises(TypeError): + mp_proxy_auxiliary_inst.get_proxy_con(["later_aux"]) + + +def test_get_proxy_con_invalid_aux(mocker, caplog, mp_proxy_auxiliary_inst): mock_get_alias = mocker.patch.object( ConfigRegistry, "get_auxes_alias", return_value="later_aux" ) @@ -226,6 +271,52 @@ def test_get_proxy_con_invalid_(mocker, caplog, mp_proxy_auxiliary_inst): mock_get_alias.assert_called() +def test_getattr_physical_cchannel( + mocker, cchannel_inst, mp_proxy_auxiliary_inst, mock_auxiliaries +): + mocker.patch.object(MpProxyAuxiliary, "_check_compatibility") + mocker.patch.object(MpProxyAuxiliary, "_check_channels_compatibility") + + cchannel_inst.some_attribute = object() + + AUX_LIST_NAMES = ["MockAux1", "MockAux2"] + proxy_inst = MpProxyAuxiliary(cchannel_inst, AUX_LIST_NAMES, name="aux") + proxy_inst.lock = mocker.MagicMock() + + mock_aux1 = importlib.import_module("pykiso.auxiliaries.MockAux1") + + assert isinstance(proxy_inst.channel, type(cchannel_inst)) + + assert mock_aux1.channel._physical_channel is cchannel_inst + + # attribute exists in the physical channel + assert mock_aux1.channel.some_attribute is cchannel_inst.some_attribute + proxy_inst.lock.__enter__.assert_called_once() + proxy_inst.lock.__exit__.assert_called_once() + proxy_inst.lock.reset_mock() + + # attribute exists in the proxy channel + assert mock_aux1.channel.cc_send is not cchannel_inst.cc_send + proxy_inst.lock.__enter__.assert_not_called() + proxy_inst.lock.__exit__.assert_not_called() + + # attribute does not exist in physical channel + with pytest.raises( + AttributeError, match="object has no attribute 'does_not_exist'" + ): + mock_aux1.channel.does_not_exist + proxy_inst.lock.__enter__.assert_called_once() + proxy_inst.lock.__exit__.assert_called_once() + + # attribute does not exist in proxy channel (no physical channel attached) + proxy_inst.lock.reset_mock() + mock_aux1.channel._physical_channel = None + with pytest.raises(AttributeError, match="has no attribute 'does_not_exist'"): + mock_aux1.channel.does_not_exist + proxy_inst.lock.__enter__.assert_not_called() + proxy_inst.lock.__exit__.assert_not_called() + + def test_create_auxiliary_instance(mp_proxy_auxiliary_inst, caplog): with caplog.at_level(logging.INFO): @@ -325,6 +416,7 @@ def test_dispatch_command_invalid(mp_proxy_auxiliary_inst, mock_auxiliaries): def test_run_command(mocker, mp_proxy_auxiliary_inst, mock_auxiliaries): mock_dispatch_command = mocker.patch.object(MpProxyAuxiliary, "_dispatch_command") mocker.patch.object(MpProxyAuxiliary, "_check_compatibility") + mocker.patch.object(MpProxyAuxiliary, "_check_channels_compatibility") mocker.patch.object(mp_proxy_auxiliary_inst, "channel") mock_queue_empty = mocker.patch("queue.Queue.empty", return_value=False) mock_queue_get = mocker.patch( diff --git a/tests/test_proxy_auxiliary.py b/tests/test_proxy_auxiliary.py index e2967def..6c067b6e 100644 --- a/tests/test_proxy_auxiliary.py +++ b/tests/test_proxy_auxiliary.py @@ -7,6 +7,7 @@ # SPDX-License-Identifier: EPL-2.0 ########################################################################## +import importlib import logging import queue import sys @@ -78,9 +79,7 @@ def __init__(self, param_1=None, param_2=None, **kwargs): self.param_1 = param_1 self.param_2 = param_2 self.channel = mock_auxiliaries - super().__init__( - name="mp_aux", - ) + super().__init__(name="mp_aux") _create_auxiliary_instance = mocker.stub(name="_create_auxiliary_instance") _create_auxiliary_instance.return_value = True @@ -128,11 +127,17 @@ def test_init_trace_not_activate(mocker): def test_get_proxy_con_valid(mocker, cchannel_inst, mock_aux_interface): - mocker.patch.object(ProxyAuxiliary, "_check_aux_compatibility") - mocker.patch.object(ProxyAuxiliary, "_check_channels_compatibility") + mock_check_aux = mocker.patch.object(ProxyAuxiliary, "_check_aux_compatibility") + mock_check_channels = mocker.patch.object( + ProxyAuxiliary, "_check_channels_compatibility" + ) + mock_bind_channel = mocker.patch.object(CCProxy, "_bind_channel_info") proxy_inst = ProxyAuxiliary(cchannel_inst, [*AUX_LIST_NAMES, mock_aux_interface]) + mock_check_channels.assert_called_once() + assert mock_check_aux.call_count == len([*AUX_LIST_NAMES, mock_aux_interface]) + assert mock_bind_channel.call_count == len([*AUX_LIST_NAMES, mock_aux_interface]) assert len(proxy_inst.proxy_channels) == 3 assert all(isinstance(items, CCProxy) for items in proxy_inst.proxy_channels) @@ -149,17 +154,66 @@ def test_get_proxy_con_invalid(mocker, caplog, cchannel_inst): assert len(proxy_inst.proxy_channels) == 0 +def test_getattr_physical_cchannel( + mocker, cchannel_inst, mock_aux_interface, mock_auxiliaries +): + mocker.patch.object(ProxyAuxiliary, "_check_aux_compatibility") + mocker.patch.object(ProxyAuxiliary, "_check_channels_compatibility") + + cchannel_inst.some_attribute = object() + + proxy_inst = ProxyAuxiliary(cchannel_inst, [*AUX_LIST_NAMES, mock_aux_interface]) + proxy_inst.lock = mocker.MagicMock() + + mock_aux1 = importlib.import_module("pykiso.auxiliaries.MockAux1") + + assert isinstance(mock_aux1.channel, CCProxy) + assert isinstance(proxy_inst.channel, type(cchannel_inst)) + + assert mock_aux1.channel._physical_channel is cchannel_inst + + # attribute exists in the physical channel instance + assert mock_aux1.channel.some_attribute is cchannel_inst.some_attribute + proxy_inst.lock.__enter__.assert_called_once() + proxy_inst.lock.__exit__.assert_called_once() + proxy_inst.lock.reset_mock() + + # attribute exists in the proxy channel instance + assert mock_aux1.channel.cc_send is not cchannel_inst.cc_send + proxy_inst.lock.__enter__.assert_not_called() + proxy_inst.lock.__exit__.assert_not_called() + + # attribute does not exist in physical channel + with pytest.raises(AttributeError, match="has no attribute 'does_not_exist'"): + mock_aux1.channel.does_not_exist + proxy_inst.lock.__enter__.assert_called_once() + proxy_inst.lock.__exit__.assert_called_once() + + # attribute does not exist in proxy channel (no physical channel attached) + proxy_inst.lock.reset_mock() + mock_aux1.channel._physical_channel = None + with pytest.raises(AttributeError, match="has no attribute 'does_not_exist'"): + mock_aux1.channel.does_not_exist + proxy_inst.lock.__enter__.assert_not_called() + proxy_inst.lock.__exit__.assert_not_called() + + def test_get_proxy_con_pre_load(mocker, cchannel_inst): mocker.patch.object(ConfigRegistry, "get_auxes_alias", return_value="later_aux") mocker.patch.object(ProxyAuxiliary, "_check_channels_compatibility") + mocker.patch.object(CCProxy, "_bind_channel_info") class Linker: def __init__(self): self._aux_cache = AuxCache() + class FakeCCProxy: + def _bind_channel_info(self, *args, **kwargs): + pass + class FakeAux: def __init__(self): - self.channel = True + self.channel = FakeCCProxy() self.is_proxy_capable = True class AuxCache: @@ -171,7 +225,7 @@ def get_instance(self, aux_name): proxy_inst = ProxyAuxiliary(cchannel_inst, ["later_aux"]) assert len(proxy_inst.proxy_channels) == 1 - assert isinstance(proxy_inst.proxy_channels[0], bool) + assert isinstance(proxy_inst.proxy_channels[0], FakeCCProxy) def test_check_aux_compatibility_exception(mocker, cchannel_inst): diff --git a/tests/test_simulated_auxiliary.py b/tests/test_simulated_auxiliary.py index b6a7f6f1..bc990eea 100644 --- a/tests/test_simulated_auxiliary.py +++ b/tests/test_simulated_auxiliary.py @@ -236,10 +236,9 @@ def test_virtual_cfg_output(capsys, prepare_config): """ cfg = cli.parse_config(prepare_config) with pytest.raises(SystemExit): - config_registry = ConfigRegistry(cfg) - config_registry.register_aux_con() + ConfigRegistry.register_aux_con(cfg) exit_code = test_execution.execute(cfg) - config_registry.delete_aux_con() + ConfigRegistry.delete_aux_con() sys.exit(exit_code) output = capsys.readouterr() diff --git a/tests/test_text_result.py b/tests/test_text_result.py index 6f9d23fb..5a09a63b 100644 --- a/tests/test_text_result.py +++ b/tests/test_text_result.py @@ -13,7 +13,7 @@ import pytest -from pykiso.test_result.text_result import ResultStream +from pykiso.test_result.text_result import BannerTestResult, ResultStream DUMMY_FILE = "dummy.txt" @@ -94,3 +94,24 @@ def test_close(self, test_result_instance): assert test_result_instance.stderr is None assert test_result_instance.file is None + + +class TestBannerTestResult: + @pytest.fixture() + def banner_test_result_instance(self): + return BannerTestResult(sys.stderr, True, 1) + + @pytest.mark.parametrize( + "error,result_expected", [((Exception), True), (None, False)] + ) + def test_addSubTest( + self, mocker, banner_test_result_instance, error, result_expected + ): + add_subTest_mock = mocker.patch("unittest.result.TestResult.addSubTest") + test_mock = mocker.patch("pykiso.test_coordinator.test_case.BasicTest") + subtest_mock = mocker.patch("unittest.case._SubTest") + + banner_test_result_instance.addSubTest(test_mock, subtest_mock, error) + + add_subTest_mock.assert_called_once_with(test_mock, subtest_mock, error) + assert banner_test_result_instance.error_occurred == result_expected diff --git a/tests/test_uds_server_auxiliary.py b/tests/test_uds_server_auxiliary.py index 595cee25..550b6446 100644 --- a/tests/test_uds_server_auxiliary.py +++ b/tests/test_uds_server_auxiliary.py @@ -108,9 +108,7 @@ def test_transmit(self, mocker, uds_server_aux_inst, req_id, expected_req_id): uds_server_aux_inst.transmit(data, req_id) mock_pad.assert_called_with(data) - mock_channel._cc_send.assert_called_with( - msg=data, remote_id=expected_req_id, raw=True - ) + mock_channel._cc_send.assert_called_with(msg=data, remote_id=expected_req_id) @pytest.mark.parametrize( "cc_receive_return, expected_received_data", @@ -130,7 +128,7 @@ def test_receive( received_data = uds_server_aux_inst.receive() - uds_server_aux_inst.channel._cc_receive.assert_called_with(timeout=0, raw=True) + uds_server_aux_inst.channel._cc_receive.assert_called_with(timeout=0) assert received_data == expected_received_data def test_send_response(self, mocker, uds_server_aux_inst): @@ -262,7 +260,7 @@ def test__receive_message(self, mocker, uds_server_aux_inst): uds_server_aux_inst._receive_message(10) - mock_channel.cc_receive.assert_called_once_with(10, raw=True) + mock_channel.cc_receive.assert_called_once_with(10) mock_uds_config.tp.decode_isotp.assert_called_once_with( received_data=b"DATA", use_external_snd_rcv_functions=True ) @@ -283,7 +281,7 @@ def test__receive_message_exception(self, caplog, mocker, uds_server_aux_inst): with caplog.at_level(logging.ERROR): uds_server_aux_inst._receive_message(10) - mock_channel.cc_receive.assert_called_once_with(10, raw=True) + mock_channel.cc_receive.assert_called_once_with(10) mock_uds_config.tp.decode_isotp.assert_called_once_with( received_data=b"DATA", use_external_snd_rcv_functions=True )
Step{{column_name}}
{{loop.index}} - {{col_value}} + {# Use a
if content is too long for better results overview and synchronize toggling them per row -#} + {% if col_value | string | length > 100 -%} +
+ {{col_value | string | truncate(53, true, leeway = 0)}} +
+
+ {{col_value}} +
+
+ {%- else -%} +
+ {{col_value}} +
+ {%- endif %}