Skip to content

Commit

Permalink
Send HighPriority Message to flush VectorBus Tx buffer (#1636)
Browse files Browse the repository at this point in the history
* Refactor flush_tx_queue for VectorBus

* Remove test_flush_tx_buffer_mocked

Implementation cannot be tested due to Vector TX queue being not accessible

* Update formatting in flush_tx_queue

* Add tests for VectorBus flush_tx_queue

* Refactor docstring for VectorBus flush_tx_queue
  • Loading branch information
deronek authored Oct 8, 2023
1 parent 237f2be commit e3d912b
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
35 changes: 34 additions & 1 deletion can/interfaces/vector/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,40 @@ def _build_xl_can_tx_event(msg: Message) -> xlclass.XLcanTxEvent:
return xl_can_tx_event

def flush_tx_buffer(self) -> None:
self.xldriver.xlCanFlushTransmitQueue(self.port_handle, self.mask)
"""
Flush the TX buffer of the bus.
Implementation does not use function ``xlCanFlushTransmitQueue`` of the XL driver, as it works only
for XL family devices.
.. warning::
Using this function will flush the queue and send a high voltage message (ID = 0, DLC = 0, no data).
"""
if self._can_protocol is CanProtocol.CAN_FD:
xl_can_tx_event = xlclass.XLcanTxEvent()
xl_can_tx_event.tag = xldefine.XL_CANFD_TX_EventTags.XL_CAN_EV_TAG_TX_MSG
xl_can_tx_event.tagData.canMsg.msgFlags |= (
xldefine.XL_CANFD_TX_MessageFlags.XL_CAN_TXMSG_FLAG_HIGHPRIO
)

self.xldriver.xlCanTransmitEx(
self.port_handle,
self.mask,
ctypes.c_uint(1),
ctypes.c_uint(0),
xl_can_tx_event,
)
else:
xl_event = xlclass.XLevent()
xl_event.tag = xldefine.XL_EventTags.XL_TRANSMIT_MSG
xl_event.tagData.msg.flags |= (
xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_OVERRUN
| xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_WAKEUP
)

self.xldriver.xlCanTransmit(
self.port_handle, self.mask, ctypes.c_uint(1), xl_event
)

def shutdown(self) -> None:
super().shutdown()
Expand Down
33 changes: 32 additions & 1 deletion test/test_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,38 @@ def test_receive_fd_non_msg_event() -> None:
def test_flush_tx_buffer_mocked(mock_xldriver) -> None:
bus = can.Bus(channel=0, interface="vector", _testing=True)
bus.flush_tx_buffer()
can.interfaces.vector.canlib.xldriver.xlCanFlushTransmitQueue.assert_called()
transmit_args = can.interfaces.vector.canlib.xldriver.xlCanTransmit.call_args[0]

num_msg = transmit_args[2]
assert num_msg.value == ctypes.c_uint(1).value

event = transmit_args[3]
assert isinstance(event, xlclass.XLevent)
assert event.tag & xldefine.XL_EventTags.XL_TRANSMIT_MSG
assert event.tagData.msg.flags & (
xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_OVERRUN
| xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_WAKEUP
)


def test_flush_tx_buffer_fd_mocked(mock_xldriver) -> None:
bus = can.Bus(channel=0, interface="vector", fd=True, _testing=True)
bus.flush_tx_buffer()
transmit_args = can.interfaces.vector.canlib.xldriver.xlCanTransmitEx.call_args[0]

num_msg = transmit_args[2]
assert num_msg.value == ctypes.c_uint(1).value

num_msg_sent = transmit_args[3]
assert num_msg_sent.value == ctypes.c_uint(0).value

event = transmit_args[4]
assert isinstance(event, xlclass.XLcanTxEvent)
assert event.tag & xldefine.XL_CANFD_TX_EventTags.XL_CAN_EV_TAG_TX_MSG
assert (
event.tagData.canMsg.msgFlags
& xldefine.XL_CANFD_TX_MessageFlags.XL_CAN_TXMSG_FLAG_HIGHPRIO
)


@pytest.mark.skipif(not XLDRIVER_FOUND, reason="Vector XL API is unavailable")
Expand Down

0 comments on commit e3d912b

Please sign in to comment.