diff --git a/.github/workflows/hil-circuitpython.yml b/.github/workflows/hil-circuitpython.yml index 7d7e490..3de7a97 100644 --- a/.github/workflows/hil-circuitpython.yml +++ b/.github/workflows/hil-circuitpython.yml @@ -21,6 +21,9 @@ on: type: boolean default: true + schedule: + - cron: '30 4 * * 1' + jobs: test: runs-on: [self-hosted, linux, circuitpython, swan-3.0, notecard-serial] diff --git a/Makefile b/Makefile index be6237e..a1009a3 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ flake8: # F403 'from module import *' used; unable to detect undefined names https://www.flake8rules.com/rules/F403.html # W503 Line break occurred before a binary operator https://www.flake8rules.com/rules/W503.html # E501 Line too long (>79 characters) https://www.flake8rules.com/rules/E501.html - ${PYTHON} -m flake8 test/ notecard/ examples/ mpy_board/ --count --ignore=E722,F401,F403,W503,E501 --show-source --statistics + ${PYTHON} -m flake8 --exclude=notecard/md5.py test/ notecard/ examples/ mpy_board/ --count --ignore=E722,F401,F403,W503,E501,E502 --show-source --statistics coverage: ${RUN_VENV_ACTIVATE} diff --git a/examples/binary-mode/binary_loopback_example.py b/examples/binary-mode/binary_loopback_example.py new file mode 100644 index 0000000..9898b70 --- /dev/null +++ b/examples/binary-mode/binary_loopback_example.py @@ -0,0 +1,110 @@ +"""note-python binary loopback example. + +This example writes an array of bytes to the binary data store on a Notecard and +reads them back. It checks that what was written exactly matches what's read +back. + +Supports MicroPython, CircuitPython, and Raspberry Pi (Linux). +""" +import sys + + +def run_example(product_uid, use_uart=True): + """Connect to Notcard and run a binary loopback test.""" + tx_buf = bytearray([ + 0x67, 0x48, 0xa8, 0x1e, 0x9f, 0xbb, 0xb7, 0x27, 0xbb, 0x31, 0x89, 0x00, 0x1f, + 0x60, 0x49, 0x8a, 0x63, 0xa1, 0x2b, 0xac, 0xb8, 0xa9, 0xb0, 0x59, 0x71, 0x65, + 0xdd, 0x87, 0x73, 0x8a, 0x06, 0x9d, 0x40, 0xc1, 0xee, 0x24, 0xca, 0x31, 0xee, + 0x88, 0xf7, 0xf1, 0x23, 0x60, 0xf2, 0x01, 0x98, 0x39, 0x21, 0x18, 0x25, 0x3c, + 0x36, 0xf7, 0x93, 0xae, 0x50, 0xd6, 0x7d, 0x93, 0x55, 0xff, 0xcb, 0x56, 0xd3, + 0xd3, 0xd5, 0xe9, 0xf0, 0x60, 0xf7, 0xe9, 0xd3, 0xa4, 0x40, 0xe7, 0x8a, 0x71, + 0x72, 0x8b, 0x28, 0x5d, 0x57, 0x57, 0x8c, 0xc3, 0xd4, 0xe2, 0x05, 0xfa, 0x98, + 0xd2, 0x26, 0x4f, 0x5d, 0xb3, 0x08, 0x02, 0xf2, 0x50, 0x23, 0x5d, 0x9c, 0x6e, + 0x63, 0x7e, 0x03, 0x22, 0xa5, 0xb3, 0x5e, 0x95, 0xf2, 0x74, 0xfd, 0x3c, 0x2d, + 0x06, 0xf8, 0xdc, 0x34, 0xe4, 0x3d, 0x42, 0x47, 0x7c, 0x61, 0xe6, 0xe1, 0x53 + ]) + + biggest_notecard_response = 400 + binary_chunk_size = 32 + uart_rx_buf_size = biggest_notecard_response + binary_chunk_size + + if sys.implementation.name == 'micropython': + from machine import UART + from machine import I2C + from machine import Pin + import board + + if use_uart: + port = UART(board.UART, 9600) + port.init(9600, bits=8, parity=None, stop=1, + timeout=3000, timeout_char=100, rxbuf=uart_rx_buf_size) + else: + port = I2C(board.I2C_ID, scl=Pin(board.SCL), sda=Pin(board.SDA)) + elif sys.implementation.name == 'circuitpython': + import busio + import board + + if use_uart: + port = busio.UART(board.TX, board.RX, baudrate=9600, receiver_buffer_size=uart_rx_buf_size) + else: + port = busio.I2C(board.SCL, board.SDA) + else: + import os + + sys.path.insert(0, os.path.abspath( + os.path.join(os.path.dirname(__file__), '..'))) + + from periphery import I2C + import serial + + if use_uart: + port = serial.Serial('/dev/ttyACM0', 9600) + else: + port = I2C('/dev/i2c-1') + + import notecard + from notecard import binary_helpers + + if use_uart: + card = notecard.OpenSerial(port, debug=True) + else: + card = notecard.OpenI2C(port, 0, 0, debug=True) + + print('Clearing out any old data...') + binary_helpers.binary_store_reset(card) + + print('Sending buffer...') + binary_helpers.binary_store_transmit(card, tx_buf, 0) + print(f'Sent {len(tx_buf)} bytes to the Notecard.') + + print('Reading it back...') + rx_buf = bytearray() + + left = binary_helpers.binary_store_decoded_length(card) + offset = 0 + while left > 0: + chunk_size = left if binary_chunk_size > left else binary_chunk_size + chunk = binary_helpers.binary_store_receive(card, offset, chunk_size) + rx_buf.extend(chunk) + left -= chunk_size + offset += chunk_size + + print(f'Received {len(rx_buf)} bytes from the Notecard.') + + print('Checking if received matches transmitted...') + rx_len = len(rx_buf) + tx_len = len(tx_buf) + assert rx_len == tx_len, f'Length mismatch between sent and received data. Sent {tx_len} bytes. Received {rx_len} bytes.' + + for idx, (tx_byte, rx_byte) in enumerate(zip(tx_buf, rx_buf)): + assert tx_byte == rx_byte, f'Data mismatch detected at index {idx}. Sent: {tx_byte}. Received: {rx_byte}.' + + print('Received matches transmitted.') + print('Example complete.') + + +if __name__ == '__main__': + product_uid = 'com.your-company.your-project' + # Choose either UART or I2C for Notecard + use_uart = True + run_example(product_uid, use_uart) diff --git a/examples/notecard-basics/cpy_example.py b/examples/notecard-basics/cpy_example.py index 5ab42ec..bf6db92 100644 --- a/examples/notecard-basics/cpy_example.py +++ b/examples/notecard-basics/cpy_example.py @@ -14,20 +14,6 @@ import busio # noqa: E402 -def NotecardExceptionInfo(exception): - """Construct a formatted Exception string. - - Args: - exception (Exception): An exception object. - - Returns: - string: a summary of the exception with line number and details. - """ - name = exception.__class__.__name__ - return sys.platform + ": " + name \ - + ": " + " ".join(map(str, exception.args)) - - def configure_notecard(card, product_uid): """Submit a simple JSON-based request to the Notecard. @@ -39,11 +25,7 @@ def configure_notecard(card, product_uid): req["product"] = product_uid req["mode"] = "continuous" - try: - card.Transaction(req) - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + card.Transaction(req) def get_temp_and_voltage(card): @@ -53,20 +35,13 @@ def get_temp_and_voltage(card): card (object): An instance of the Notecard class """ - temp = 0 - voltage = 0 - - try: - req = {"req": "card.temp"} - rsp = card.Transaction(req) - temp = rsp["value"] + req = {"req": "card.temp"} + rsp = card.Transaction(req) + temp = rsp["value"] - req = {"req": "card.voltage"} - rsp = card.Transaction(req) - voltage = rsp["value"] - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + req = {"req": "card.voltage"} + rsp = card.Transaction(req) + voltage = rsp["value"] return temp, voltage @@ -75,7 +50,8 @@ def run_example(product_uid, use_uart=True): """Connect to Notcard and run a transaction test.""" print("Opening port...") if use_uart: - port = busio.UART(board.TX, board.RX, baudrate=9600) + port = busio.UART(board.TX, board.RX, baudrate=9600, + receiver_buffer_size=128) else: port = busio.I2C(board.SCL, board.SDA) diff --git a/examples/notecard-basics/mpy_example.py b/examples/notecard-basics/mpy_example.py index ba0c68e..a1cee74 100644 --- a/examples/notecard-basics/mpy_example.py +++ b/examples/notecard-basics/mpy_example.py @@ -16,20 +16,6 @@ from machine import Pin -def NotecardExceptionInfo(exception): - """Construct a formatted Exception string. - - Args: - exception (Exception): An exception object. - - Returns: - string: a summary of the exception with line number and details. - """ - name = exception.__class__.__name__ - return sys.platform + ": " + name + ": " \ - + " ".join(map(str, exception.args)) - - def configure_notecard(card, product_uid): """Submit a simple JSON-based request to the Notecard. @@ -41,11 +27,7 @@ def configure_notecard(card, product_uid): req["product"] = product_uid req["mode"] = "continuous" - try: - card.Transaction(req) - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + card.Transaction(req) def get_temp_and_voltage(card): @@ -55,20 +37,13 @@ def get_temp_and_voltage(card): card (object): An instance of the Notecard class """ - temp = 0 - voltage = 0 - - try: - req = {"req": "card.temp"} - rsp = card.Transaction(req) - temp = rsp["value"] - - req = {"req": "card.voltage"} - rsp = card.Transaction(req) - voltage = rsp["value"] - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + req = {"req": "card.temp"} + rsp = card.Transaction(req) + temp = rsp["value"] + + req = {"req": "card.voltage"} + rsp = card.Transaction(req) + voltage = rsp["value"] return temp, voltage diff --git a/notecard/binary_helpers.py b/notecard/binary_helpers.py new file mode 100644 index 0000000..c15fe85 --- /dev/null +++ b/notecard/binary_helpers.py @@ -0,0 +1,145 @@ +"""Helper methods for doing binary transfers to/from a Notecard.""" + +import sys +from .cobs import cobs_encode, cobs_decode +from .notecard import Notecard, CARD_INTRA_TRANSACTION_TIMEOUT_SEC + +BINARY_RETRIES = 2 + +if sys.implementation.name == 'cpython': + import hashlib + + def _md5_hash(data): + """Create an MD5 digest of the given data.""" + return hashlib.md5(data).hexdigest() +else: + from .md5 import digest as _md5_hash + + +def binary_store_decoded_length(card: Notecard): + """Get the length of the decoded binary data store.""" + rsp = card.Transaction({'req': 'card.binary'}) + # Ignore {bad-bin} errors, but fail on other types of errors. + if 'err' in rsp and '{bad-bin}' not in rsp['err']: + raise Exception( + f'Error in response to card.binary request: {rsp["err"]}.') + + return rsp['length'] if 'length' in rsp else 0 + + +def binary_store_reset(card: Notecard): + """Reset the binary data store.""" + rsp = card.Transaction({'req': 'card.binary', 'delete': True}) + if 'err' in rsp: + raise Exception( + f'Error in response to card.binary delete request: {rsp["err"]}.') + + +def binary_store_transmit(card: Notecard, data: bytearray, offset: int): + """Write bytes to index `offset` of the binary data store.""" + # Make a copy of the data to transmit. We do not modify the user's passed in + # `data` object. + tx_data = bytearray(data) + rsp = card.Transaction({'req': 'card.binary'}) + + # Ignore `{bad-bin}` errors, because we intend to overwrite the data. + if 'err' in rsp and '{bad-bin}' not in rsp['err']: + raise Exception(rsp['err']) + + if 'max' not in rsp or rsp['max'] == 0: + raise Exception(('Unexpected card.binary response: max is zero or not ' + 'present.')) + + curr_len = rsp['length'] if 'length' in rsp else 0 + if offset != curr_len: + raise Exception('Notecard data length is misaligned with offset.') + + max_len = rsp['max'] + remaining = max_len - curr_len if offset > 0 else max_len + if len(tx_data) > remaining: + raise Exception(('Data to transmit won\'t fit in the Notecard\'s binary' + ' store.')) + + encoded = cobs_encode(tx_data, ord('\n')) + req = { + 'req': 'card.binary.put', + 'cobs': len(encoded), + 'status': _md5_hash(tx_data) + } + encoded.append(ord('\n')) + if offset > 0: + req['offset'] = offset + + tries = 1 + BINARY_RETRIES + while tries > 0: + try: + # We need to hold the lock for both the card.binary.put transaction + # and the subsequent transmission of the binary data. + card.lock() + + # Pass lock=false because we're already locked. + rsp = card.Transaction(req, lock=False) + if 'err' in rsp: + raise Exception(rsp['err']) + + # Send the binary data. + card.transmit(encoded, delay=False) + finally: + card.unlock() + + rsp = card.Transaction({'req': 'card.binary'}) + if 'err' in rsp: + # Retry on {bad-bin} errors. + if '{bad-bin}' in rsp['err']: + tries -= 1 + + if card._debug and tries > 0: + print('Error during binary transmission, retrying...') + # Fail on all other error types. + else: + raise Exception(rsp['err']) + else: + break + + if tries == 0: + raise Exception('Failed to transmit binary data.') + + +def binary_store_receive(card, offset: int, length: int): + """Receive `length' bytes from index `offset` of the binary data store.""" + req = { + 'req': 'card.binary.get', + 'offset': offset, + 'length': length + } + try: + # We need to hold the lock for both the card.binary.get transaction + # and the subsequent receipt of the binary data. + card.lock() + + # Pass lock=false because we're already locked. + rsp = card.Transaction(req, lock=False) + if 'err' in rsp: + raise Exception(rsp['err']) + + # Receive the binary data, keeping everything except the last byte, + # which is a newline. + try: + encoded = card.receive(delay=False)[:-1] + except Exception as e: + # Queue up a reset if there was an issue receiving the binary data. + # The reset will attempt to drain the binary data from the Notecard + # so that the comms channel with the Notecard is clean before the + # next transaction. + card._reset_required = True + raise e + + finally: + card.unlock() + + decoded = cobs_decode(encoded, ord('\n')) + + if _md5_hash(decoded) != rsp['status']: + raise Exception('Computed MD5 does not match received MD5.') + + return decoded diff --git a/notecard/cobs.py b/notecard/cobs.py new file mode 100644 index 0000000..295b345 --- /dev/null +++ b/notecard/cobs.py @@ -0,0 +1,53 @@ +"""Methods for COBS encoding and decoding arbitrary bytearrays.""" + + +def cobs_encode(data: bytearray, eop: int) -> bytearray: + """COBS encode an array of bytes, using eop as the end of packet marker.""" + cobs_overhead = 1 + (len(data) // 254) + encoded = bytearray(len(data) + cobs_overhead) + code = 1 + idx = 0 + code_idx = idx + idx += 1 + + for byte in data: + if byte != 0: + encoded[idx] = byte ^ eop + idx += 1 + code += 1 + if byte == 0 or code == 0xFF: + encoded[code_idx] = code ^ eop + code = 1 + code_idx = idx + idx += 1 + + encoded[code_idx] = code ^ eop + + return encoded[:idx] + + +def cobs_decode(encoded: bytes, eop: int) -> bytearray: + """COBS decode an array of bytes, using eop as the end of packet marker.""" + decoded = bytearray(len(encoded)) + idx = 0 + copy = 0 + code = 0xFF + + for byte in encoded: + if copy != 0: + decoded[idx] = byte ^ eop + idx += 1 + else: + if code != 0xFF: + decoded[idx] = 0 + idx += 1 + + copy = byte ^ eop + code = copy + + if code == 0: + break + + copy -= 1 + + return decoded[:idx] diff --git a/notecard/crc32.py b/notecard/crc32.py new file mode 100644 index 0000000..4a2d208 --- /dev/null +++ b/notecard/crc32.py @@ -0,0 +1,33 @@ +"""Module for computing the CRC32 of arbitrary data.""" + +crc32_lookup_table = [ + 0x00000000, 0x1DB71064, 0x3B6E20C8, 0x26D930AC, 0x76DC4190, 0x6B6B51F4, + 0x4DB26158, 0x5005713C, 0xEDB88320, 0xF00F9344, 0xD6D6A3E8, 0xCB61B38C, + 0x9B64C2B0, 0x86D3D2D4, 0xA00AE278, 0xBDBDF21C +] + + +def _logical_rshift(val, shift_amount, num_bits=32): + """Logcally right shift `val` by `shift_amount` bits. + + Logical right shift (i.e. right shift that fills with 0s instead of the + sign bit) isn't supported natively in Python. This is a simple + implementation. See: + https://realpython.com/python-bitwise-operators/#arithmetic-vs-logical-shift + """ + unsigned_val = val % (1 << num_bits) + return unsigned_val >> shift_amount + + +def crc32(data): + """Compute CRC32 of the given data. + + Small lookup-table half-byte CRC32 algorithm based on: + https://create.stephan-brumme.com/crc32/#half-byte + """ + crc = ~0 + for idx in range(len(data)): + crc = crc32_lookup_table[(crc ^ data[idx]) & 0x0F] ^ _logical_rshift(crc, 4) + crc = crc32_lookup_table[(crc ^ _logical_rshift(data[idx], 4)) & 0x0F] ^ _logical_rshift(crc, 4) + + return ~crc & 0xffffffff diff --git a/notecard/md5.py b/notecard/md5.py new file mode 100644 index 0000000..fb23e3a --- /dev/null +++ b/notecard/md5.py @@ -0,0 +1,90 @@ +"""Module for computing MD5 hash for MicroPython and CircuitPython.""" + +""" +Copyright [2018] [Mauro Riva ] +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + + +based on https://rosettacode.org/wiki/MD5/Implementation#Python +adapted for MicroPython + +Adapted by Hayden Roche for use by Blues in note-python. +""" + +import sys + + +# CPython already has MD5 available. It's MicroPython and CircuitPython where +# MD5 from hashlib may or may not be available, depending on the build of the +# firmware, so we provide our own implementation. +if sys.implementation.name != 'cpython': + rotate_amounts = [7, 12, 17, 22, 7, 12, 17, 22, 7, 12, 17, 22, 7, 12, 17, 22, + 5, 9, 14, 20, 5, 9, 14, 20, 5, 9, 14, 20, 5, 9, 14, 20, + 4, 11, 16, 23, 4, 11, 16, 23, 4, 11, 16, 23, 4, 11, 16, 23, + 6, 10, 15, 21, 6, 10, 15, 21, 6, 10, 15, 21, 6, 10, 15, 21] + + #constants = [int(abs(math.sin(i+1)) * 2**32) & 0xFFFFFFFF for i in range(64)] # precision is not enough + constants = [3614090360, 3905402710, 606105819, 3250441966, 4118548399, 1200080426, 2821735955, 4249261313, + 1770035416, 2336552879, 4294925233, 2304563134, 1804603682, 4254626195, 2792965006, 1236535329, + 4129170786, 3225465664, 643717713, 3921069994, 3593408605, 38016083, 3634488961, 3889429448, + 568446438, 3275163606, 4107603335, 1163531501, 2850285829, 4243563512, 1735328473, 2368359562, + 4294588738, 2272392833, 1839030562, 4259657740, 2763975236, 1272893353, 4139469664, 3200236656, + 681279174, 3936430074, 3572445317, 76029189, 3654602809, 3873151461, 530742520, 3299628645, + 4096336452, 1126891415, 2878612391, 4237533241, 1700485571, 2399980690, 4293915773, 2240044497, + 1873313359, 4264355552, 2734768916, 1309151649, 4149444226, 3174756917, 718787259, 3951481745] + + init_values = [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476] + + functions = 16*[lambda b, c, d: (b & c) | (~b & d)] + \ + 16*[lambda b, c, d: (d & b) | (~d & c)] + \ + 16*[lambda b, c, d: b ^ c ^ d] + \ + 16*[lambda b, c, d: c ^ (b | ~d)] + + index_functions = 16*[lambda i: i] + \ + 16*[lambda i: (5*i + 1)%16] + \ + 16*[lambda i: (3*i + 5)%16] + \ + 16*[lambda i: (7*i)%16] + + def left_rotate(x, amount): # noqa + x &= 0xFFFFFFFF + return ((x<>(32-amount))) & 0xFFFFFFFF + + def md5(message): # noqa + message = bytearray(message) #copy our input into a mutable buffer + orig_len_in_bits = (8 * len(message)) & 0xffffffffffffffff + message.append(0x80) + while len(message)%64 != 56: + message.append(0) + message += orig_len_in_bits.to_bytes(8, 'little') + + hash_pieces = init_values[:] + + for chunk_ofst in range(0, len(message), 64): + a, b, c, d = hash_pieces + chunk = message[chunk_ofst:chunk_ofst+64] + for i in range(64): + f = functions[i](b, c, d) + g = index_functions[i](i) + to_rotate = a + f + constants[i] + int.from_bytes(chunk[4*g:4*g+4], 'little') + new_b = (b + left_rotate(to_rotate, rotate_amounts[i])) & 0xFFFFFFFF + a, b, c, d = d, new_b, b, c + + for i, val in enumerate([a, b, c, d]): + hash_pieces[i] += val + hash_pieces[i] &= 0xFFFFFFFF + return sum(x<<(32*i) for i, x in enumerate(hash_pieces)) + + def digest(message): # noqa + digest = md5(message) + raw = digest.to_bytes(16, 'little') + return '{:032x}'.format(int.from_bytes(raw, 'big')) diff --git a/notecard/notecard.py b/notecard/notecard.py index d36f2b6..357a3e0 100644 --- a/notecard/notecard.py +++ b/notecard/notecard.py @@ -36,12 +36,12 @@ import time from .timeout import start_timeout, has_timed_out from .transaction_manager import TransactionManager, NoOpTransactionManager +from .crc32 import crc32 use_periphery = False use_serial_lock = False if sys.implementation.name == 'cpython' and (sys.platform == 'linux' or sys.platform == 'linux2'): - use_periphery = True from periphery import I2C @@ -54,19 +54,32 @@ class SerialLockTimeout(Exception): pass +use_i2c_lock = not use_periphery and sys.implementation.name != 'micropython' + NOTECARD_I2C_ADDRESS = 0x17 +NOTECARD_I2C_MAX_TRANSFER_DEFAULT = 255 -# The notecard is a real-time device that has a fixed size interrupt buffer. -# We can push data at it far, far faster than it can process it, -# therefore we push it in segments with a pause between each segment. +# The notecard is a real-time device that has a fixed size interrupt buffer. We +# can push data at it far, far faster than it can process it. Therefore, we push +# it in segments with a pause between each segment. CARD_REQUEST_SEGMENT_MAX_LEN = 250 -# "a 250ms delay is required to separate "segments", ~256 byte -# I2C transactions." See +# "a 250ms delay is required to separate "segments", ~256 byte I2C +# transactions." See # https://dev.blues.io/guides-and-tutorials/notecard-guides/serial-over-i2c-protocol/#data-write CARD_REQUEST_SEGMENT_DELAY_MS = 250 # "A 20ms delay is commonly used to separate smaller I2C transactions known as # 'chunks'". See the same document linked above. -I2C_CHUNK_DELAY_MS = 20 +CARD_REQUEST_I2C_CHUNK_DELAY_MS = 20 +# The delay, in miliseconds, to wait after receiving a NACK I2C. +CARD_REQUEST_I2C_NACK_WAIT_MS = 1000 +# The number of times to retry syncing up with the Notecard during a reset +# before giving up. +CARD_RESET_SYNC_RETRIES = 10 +# The time, in miliseconds, to drain incoming messages during a reset. +CARD_RESET_DRAIN_MS = 500 +CARD_INTER_TRANSACTION_TIMEOUT_SEC = 30 +CARD_INTRA_TRANSACTION_TIMEOUT_SEC = 1 +CARD_TRANSACTION_RETRIES = 5 class NoOpContextManager: @@ -88,51 +101,16 @@ def acquire(*args, **kwargs): """Acquire the no-op lock.""" return NoOpContextManager() - -def serial_lock(fn): - """Attempt to get a lock on the serial channel used for Notecard comms.""" - - def decorator(self, *args, **kwargs): - try: - with self.lock.acquire(timeout=5): - return fn(self, *args, **kwargs) - except SerialLockTimeout: - raise Exception('Notecard in use') - - return decorator - - -def i2c_lock(fn): - """Attempt to get a lock on the I2C bus used for Notecard comms.""" - - def decorator(self, *args, **kwargs): - retries = 5 - while retries != 0: - if self.lock(): - break - - retries -= 1 - # Try again after 100 ms. - time.sleep(.1) - - if retries == 0: - raise Exception('Failed to acquire I2C lock.') - - try: - ret = fn(self, *args, **kwargs) - finally: - self.unlock() - - return ret - - return decorator + def release(*args, **kwargs): + """Release the no-op lock.""" + pass class Notecard: """Base Notecard class.""" def __init__(self, debug=False): - """Configure user agent.""" + """Initialize the Notecard object.""" self._user_agent_app = None self._user_agent_sent = False self._user_agent = { @@ -147,6 +125,74 @@ def __init__(self, debug=False): self._user_agent['os_family'] = os.uname().machine self._transaction_manager = NoOpTransactionManager() self._debug = debug + self._last_request_seq_number = 0 + self._card_supports_crc = False + self._reset_required = True + + def _crc_add(self, req_string, seq_number): + """Add a CRC field to the request. + + The CRC field also contains a sequence number and has this format: + + "crc":"SSSS:CCCCCCCC" + + SSSS is the sequence number encoded as a string of 4 hex digits. + CCCCCCCC is the CRC32 encoded as a string of 8 hex digits. + """ + req_bytes = req_string.encode('utf-8') + crc_hex = '{:08x}'.format(crc32(req_bytes)) + seq_number_hex = '{:04x}'.format(seq_number) + crc_field = f'"crc":"{seq_number_hex}:{crc_hex}"' + req_string_w_crc = req_string[:-1] + if req_string[-2] == '{': + req_string_w_crc += f'{crc_field}' + else: + req_string_w_crc += f',{crc_field}' + req_string_w_crc += '}' + + return req_string_w_crc + + def _crc_error(self, rsp_bytes): + """Check the CRC in a Notecard response.""" + rsp_json = json.loads(rsp_bytes) + if 'crc' not in rsp_json: + # If there's not a 'crc' field in the response, it's only an error + # if the Notecard supports CRC. + return self._card_supports_crc + + self._card_supports_crc = True + + # Extract the sequence number and CRC. We do all this via string + # operations instead of decoding the JSON. In Python, numbers with long + # decimal parts (e.g. 10.11111111111111123445522123) get truncated in + # the decoding process. When re-encoded, the string representation is + # also truncated, and so the CRC will be computed over a different + # string than was originally sent, resulting in a CRC error. + seq_number, crc = rsp_json['crc'].split(':') + # Remove the 'crc' field from the response. + rsp_str = rsp_bytes.decode() + rsp_str_crc_removed = rsp_str.split('"crc":')[0] + if rsp_str_crc_removed[-1] == ',': + rsp_str_crc_removed = rsp_str_crc_removed[:-1] + '}' + else: + rsp_str_crc_removed = rsp_str_crc_removed.rstrip() + '}' + + # Compute the CRC over the response, with the 'crc' field removed. + bytes_for_crc = rsp_str_crc_removed.encode('utf-8') + computed_crc = '{:08x}'.format(crc32(bytes_for_crc)).upper() + expected_seq_number = '{:04x}'.format(self._last_request_seq_number) + + if seq_number != expected_seq_number: + if self._debug: + print('Sequence number mismatch. Expected ' + \ + f'{expected_seq_number}, received {seq_number}.') + return True + elif crc != computed_crc: + if self._debug: + print(f'CRC error. Computed {computed_crc}, received {crc}.') + return True + + return False def _prepare_request(self, req): """Prepare a request for transmission to the Notecard.""" @@ -158,34 +204,185 @@ def _prepare_request(self, req): self._user_agent_sent = True - # Serialize the JSON request to a string. - req_string = json.dumps(req) + rsp_expected = 'req' in req + + # If this is a request and not a command, add a CRC. + req_string = json.dumps(req, separators=(',', ':')) + if rsp_expected: + req_string = self._crc_add(req_string, + self._last_request_seq_number) + + # Serialize the JSON request to a string, removing any unnecessary + # whitespace. if self._debug: print(req_string) req_string += "\n" # Encode the request string as UTF-8 bytes. - return req_string.encode('utf-8') + return (req_string.encode('utf-8'), rsp_expected) + + def _transaction_timeout_seconds(self, req): + """Determine the timeout to use, in seconds, for the transaction. + + When note.add or web.* requests are used to transfer binary data, the + time to complete the transaction varies depending on the size of the + payload and network conditions. Therefore, it's possible for these + transactions to timeout prematurely. + + This method does the following: + - If the request is a `note.add`, set the timeout value to the + value of the "milliseconds" parameter, if it exists. If it + doesn't, use the "seconds" parameter. If that doesn't exist, + use the standard timeout of `CARD_INTER_TRANSACTION_TIMEOUT_SEC`. + - If the request is a `web.*`, follow the same logic, but instead + of using the standard timeout, use 90 seconds for all `web.*` + transactions. + """ + timeout_secs = CARD_INTER_TRANSACTION_TIMEOUT_SEC + if 'req' in req: + req_key = 'req' + elif 'cmd' in req: + req_key = 'cmd' + else: + raise Exception('Malformed request. Missing \'req\' or \'cmd\' ' + \ + f'field: {req}.') + + if req[req_key] == 'note.add': + if 'milliseconds' in req: + timeout_secs = req['milliseconds'] / 1000 + elif 'seconds' in req: + timeout_secs = req['seconds'] + elif 'web.' in req[req_key]: + if 'milliseconds' in req: + timeout_secs = req['milliseconds'] / 1000 + elif 'seconds' in req: + timeout_secs = req['seconds'] + else: + timeout_secs = 90 - def Command(self, req): - """Send a command to the Notecard. The Notecard response is ignored.""" - if 'cmd' not in req: - raise Exception("Please use 'cmd' instead of 'req'") + if self._debug: + print(f'Using transaction timeout of {timeout_secs} seconds.') - req_bytes = self._prepare_request(req) - self._transact(req_bytes, False) + return timeout_secs - def Transaction(self, req): - """Perform a Notecard transaction and return the result.""" - req_bytes = self._prepare_request(req) - rsp_bytes = self._transact(req_bytes, True) - rsp_json = json.loads(rsp_bytes) - if self._debug: + def Transaction(self, req, lock=True): + """Send a request to the Notecard and read back a response. + + If the request is a command (indicated by using 'cmd' in the request + instead of 'req'), don't return a response. + + The underlying transport channel (serial or I2C) is locked for the + duration of the request and response if `lock` is True. + """ + rsp_json = None + timeout_secs = self._transaction_timeout_seconds(req) + req_bytes, rsp_expected = self._prepare_request(req) + + if self._reset_required: + self.Reset() + + try: + self._transaction_manager.start(CARD_INTER_TRANSACTION_TIMEOUT_SEC) + if lock: + self.lock() + + retries_left = CARD_TRANSACTION_RETRIES + error = False + if rsp_expected: + while retries_left > 0: + try: + rsp_bytes = self._transact( + req_bytes, rsp_expected=True, + timeout_secs=timeout_secs) + except Exception as e: + if self._debug: + print(e) + + error = True + self.Reset() + retries_left -= 1 + time.sleep(0.5) + continue + + if self._crc_error(rsp_bytes): + if self._debug: + print('CRC error on response from Notecard.') + + error = True + retries_left -= 1 + time.sleep(0.5) + continue + + try: + rsp_json = json.loads(rsp_bytes) + except Exception as e: + if self._debug: + print(e) + + error = True + retries_left -= 1 + time.sleep(0.5) + continue + + if 'err' in rsp_json: + if '{io}' in rsp_json['err']: + if self._debug: + print('Response has error field indicating ' + \ + f'I/O error: {rsp_json}') + + error = True + retries_left -= 1 + time.sleep(0.5) + continue + elif '{bad-bin}' in rsp_json['err']: + if self._debug: + print('Response has error field indicating ' + \ + f'binary I/O error: {rsp_json}') + print('Not eligible for retry.') + + error = True + break + + error = False + break + else: + try: + self._transact(req_bytes, rsp_expected=False, + timeout_secs=timeout_secs) + except Exception as e: + error = True + if self._debug: + print(e) + + self._last_request_seq_number += 1 + + if error: + self._reset_required = True + raise Exception('Failed to transact with Notecard.') + + finally: + if lock: + self.unlock() + + self._transaction_manager.stop() + + if self._debug and rsp_json is not None: print(rsp_json) return rsp_json + def Command(self, req): + """Send a command to the Notecard. + + Unlike `Transaction`, `Command` doesn't return a response from the + Notecard. + """ + if 'cmd' not in req: + raise Exception("Please use 'cmd' instead of 'req'") + + self.Transaction(req) + def GetUserAgent(self): """Return the User Agent String for the host for debug purposes.""" ua_copy = self._user_agent.copy() @@ -208,96 +405,156 @@ def SetTransactionPins(self, rtx_pin, ctx_pin): class OpenSerial(Notecard): """Notecard class for Serial communication.""" - @serial_lock - def _transact(self, req, rsp_expected): - """Perform a low-level transaction with the Notecard.""" - rsp = None + def _transact(self, req_bytes, rsp_expected, + timeout_secs=CARD_INTER_TRANSACTION_TIMEOUT_SEC): + self.transmit(req_bytes) - try: - transaction_timeout_secs = 30 - self._transaction_manager.start(transaction_timeout_secs) - - seg_off = 0 - seg_left = len(req) - while seg_left > 0: - seg_len = seg_left - if seg_len > CARD_REQUEST_SEGMENT_MAX_LEN: - seg_len = CARD_REQUEST_SEGMENT_MAX_LEN - - self.uart.write(req[seg_off:seg_off + seg_len]) - seg_off += seg_len - seg_left -= seg_len - time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + if not rsp_expected: + return - if rsp_expected: - rsp = self.uart.readline() - finally: - self._transaction_manager.stop() + start = start_timeout() + while not self._available(): + if timeout_secs != 0 and has_timed_out(start, timeout_secs): + raise Exception('Timed out while querying Notecard for ' + \ + 'available data.') - return rsp + # Delay for 10 ms before checking for available data again. + time.sleep(.01) - def _read_byte_micropython(self): - """Read a single byte from the Notecard (MicroPython).""" - if not self.uart.any(): - return None - return self.uart.read(1) + return self.receive() - def _read_byte_cpython(self): - """Read a single byte from the Notecard (CPython).""" - if self.uart.in_waiting == 0: - return None - return self.uart.read(1) + def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC, + delay=True): + """Read a newline-terminated batch of data from the Notecard.""" + data = bytearray() + received_newline = False + start = start_timeout() + + while not received_newline: + while not self._available(): + if timeout_secs != 0 and has_timed_out(start, timeout_secs): + raise Exception('Timed out waiting to receive data from' + \ + ' Notecard.') + + # Sleep while awaiting the first byte (lazy). After the first + # byte, start to spin for the remaining bytes (greedy). + if delay and len(data) == 0: + time.sleep(.001) + + timeout_secs = CARD_INTRA_TRANSACTION_TIMEOUT_SEC + start = start_timeout() + byte = self._read_byte() + data.extend(byte) + received_newline = byte == b'\n' + + return data + + def transmit(self, data, delay=True): + """Send `data` to the Notecard.""" + seg_off = 0 + seg_left = len(data) + + while seg_left > 0: + seg_len = seg_left + if seg_len > CARD_REQUEST_SEGMENT_MAX_LEN: + seg_len = CARD_REQUEST_SEGMENT_MAX_LEN + + self.uart.write(data[seg_off:seg_off + seg_len]) + seg_off += seg_len + seg_left -= seg_len - def _read_byte_circuitpython(self): - """Read a single byte from the Notecard (CircuitPython).""" + if delay: + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + + def _available_micropython(self): + return self.uart.any() + + def _available_default(self): + return self.uart.in_waiting > 0 + + def _read_byte(self): + """Read a single byte from the Notecard.""" return self.uart.read(1) - @serial_lock def Reset(self): """Reset the Notecard.""" + if self._debug: + print('Resetting Notecard serial communications.') + + # Delay to give the Notecard a chance to process any segment sent prior + # to the coming reset sequence. + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + notecard_ready = False - for i in range(10): - try: - # Send a newline to the Notecard to terminate any partial - # request that might be sitting in its input buffer. - self.uart.write(b'\n') - except: - # Wait 500 ms and before trying to send the newline again. - time.sleep(.5) - continue + try: + self.lock() + + for i in range(CARD_RESET_SYNC_RETRIES): + try: + # Send a newline to the Notecard to terminate any partial + # request that might be sitting in its input buffer. + self.uart.write(b'\n') + except Exception as e: + if self._debug: + print(e) + # Wait CARD_RESET_DRAIN_MS and before trying to send the + # newline again. + time.sleep(CARD_RESET_DRAIN_MS / 1000) + continue + + something_found = False + non_control_char_found = False + # Drain serial for 500 ms. + start = start_timeout() + while not has_timed_out(start, CARD_RESET_DRAIN_MS / 1000): + while self._available(): + something_found = True + data = self._read_byte() + if data[0] != ord('\n') and data[0] != ord('\r'): + non_control_char_found = True + # Reset the timer with each non-control character. + start = start_timeout() + + # If there was no data read from the Notecard, wait 1 ms and + # try again. Keep doing this for CARD_RESET_DRAIN_MS. + time.sleep(.001) + + if not something_found: + if self._debug: + print('Notecard not responding to newline during ' + \ + 'reset.') + + elif non_control_char_found: + if self._debug: + print('Received non-control characters from the ' + \ + 'Notecard during reset.') + else: + # If all we got back is newlines, we're in sync with the + # Notecard. + notecard_ready = True + break + + if self._debug: + print('Retrying reset...') + + # Wait CARD_RESET_DRAIN_MS before trying again. + time.sleep(CARD_RESET_DRAIN_MS / 1000) + + if not notecard_ready: + raise Exception('Failed to reset Notecard.') - something_found = False - non_control_char_found = False - # Drain serial for 500 ms. - start = start_timeout() - while not has_timed_out(start, 0.5): - data = self._read_byte() - # If data was read from the Notecard, inspect what we received. - # If it isn't a \n or \r, the host and the Notecard aren't - # synced up yet, and we'll need to retransmit the \n and try - # again. - while data is not None and data != b'': - something_found = True - if data[0] != ord('\n') and data[0] != ord('\r'): - non_control_char_found = True - - data = self._read_byte() - - # If there was no data read from the Notecard, wait 1 ms and try - # again. Keep doing this for 500 ms. - time.sleep(.001) - - # If we received anything other than newlines from the Notecard, we - # aren't in sync, yet. - if something_found and not non_control_char_found: - notecard_ready = True - break + finally: + self.unlock() - # Wait 500 ms before trying again. - time.sleep(.5) + self._reset_required = False - if not notecard_ready: - raise Exception('Failed to reset Notecard.') + def lock(self): + """Lock access to the serial bus.""" + self.lock_handle.acquire(timeout=5) + + def unlock(self): + """Unlock access to the serial bus.""" + self.lock_handle.release() def __init__(self, uart_id, debug=False): """Initialize the Notecard before a reset.""" @@ -308,18 +565,19 @@ def __init__(self, uart_id, debug=False): self.uart = uart_id if use_serial_lock: - self.lock = FileLock('serial.lock') + self.lock_handle = FileLock('serial.lock') else: - self.lock = NoOpSerialLock() + self.lock_handle = NoOpSerialLock() if sys.implementation.name == 'micropython': - self._read_byte = self._read_byte_micropython - elif sys.implementation.name == 'cpython': - self._read_byte = self._read_byte_cpython - elif sys.implementation.name == 'circuitpython': - self._read_byte = self._read_byte_circuitpython + self._available = self._available_micropython else: - raise NotImplementedError(f'Unsupported platform: {sys.implementation.name}') + if hasattr(self.uart, 'in_waiting'): + self._available = self._available_default + else: + raise NotImplementedError('Serial communications with the ' + \ + 'Notecard are not supported for ' + \ + 'this platform.') self.Reset() @@ -327,36 +585,8 @@ def __init__(self, uart_id, debug=False): class OpenI2C(Notecard): """Notecard class for I2C communication.""" - def _write(self, data): - write_length = bytearray(1) - write_length[0] = len(data) - - # Send a message with the length of the incoming bytes followed - # by the bytes themselves. - self._platform_write(write_length, data) - - def _transmit(self, data): - chunk_offset = 0 - data_left = len(data) - sent_in_seg = 0 - - while data_left > 0: - chunk_len = min(data_left, self.max) - write_data = data[chunk_offset:chunk_offset + chunk_len] - - self._write(write_data) - - chunk_offset += chunk_len - data_left -= chunk_len - sent_in_seg += chunk_len - - if sent_in_seg > CARD_REQUEST_SEGMENT_MAX_LEN: - sent_in_seg -= CARD_REQUEST_SEGMENT_MAX_LEN - time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) - - time.sleep(I2C_CHUNK_DELAY_MS / 1000) - def _read(self, length): + """Perform a serial-over-I2C read.""" initiate_read = bytearray(2) # 0 indicates we are reading from the Notecard. initiate_read[0] = 0 @@ -366,116 +596,257 @@ def _read(self, length): # length accounts for the payload and the +2 is for the header. The # header sent by the Notecard has one byte to indicate the number of # bytes still available to read and a second byte to indicate the number - # of bytes coming in the current chunk. + # of bytes coming in the current packet. read_buf = bytearray(length + 2) - return self._platform_read(initiate_read, read_buf) + self._platform_read(initiate_read, read_buf) + # First two bytes are the header. + header = read_buf[0:2] + # The number of bytes still available to read after this packet. + available = header[0] + # The number of data bytes in this packet. + data_len = header[1] + # The rest is the data. + data = read_buf[2:] + + if len(data) != data_len: + raise Exception('Serial-over-I2C error: reported data length ' + \ + f'({data_len}) differs from actual data length' + \ + f' ({len(data)}).') + + return available, data - def _receive(self, timeout_secs, chunk_delay_secs, wait_for_newline): - chunk_len = 0 + def _write(self, data): + """Perform a serial-over-I2C write.""" + self._platform_write(bytearray([len(data)]), data) + + def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC, + delay=True): + """Read a newline-terminated batch of data from the Notecard.""" + read_len = 0 received_newline = False + timeout_secs = CARD_INTER_TRANSACTION_TIMEOUT_SEC start = start_timeout() - read_data = bytearray() + received_data = bytearray() while True: - read_buf = self._read(chunk_len) - - # The number of bytes still available to read. - num_bytes_available = read_buf[0] - # The number of bytes in this chunk. - num_bytes_this_chunk = read_buf[1] - if num_bytes_this_chunk > 0: - read_data += read_buf[2:2 + num_bytes_this_chunk] - received_newline = read_buf[-1] == ord('\n') - - chunk_len = min(num_bytes_available, self.max) - # Keep going if there's still byte available to read, even if + available, data = self._read(read_len) + if len(data) > 0: + received_data += data + + timeout_secs = CARD_INTRA_TRANSACTION_TIMEOUT_SEC + start = start_timeout() + + if not received_newline: + received_newline = data[-1] == ord('\n') + + read_len = min(available, self.max) + # Keep going if there are still bytes available to read, even if # we've received a newline. - if chunk_len > 0: + if available > 0: continue - # Otherwise, if there's no bytes available to read and we either + # Otherwise, if there are no bytes available to read and we either # 1) don't care about waiting for a newline or 2) do care and # received the newline, we're done. - if not wait_for_newline or received_newline: + if received_newline: break - # Delay between reading chunks. Note that as long as bytes are - # available to read (i.e. chunk_len > 0), we don't delay here, nor - # do we check the timeout below. This is intentional and mimics the - # behavior of other SDKs (e.g. note-c). - time.sleep(chunk_delay_secs) - if timeout_secs != 0 and has_timed_out(start, timeout_secs): - raise Exception("Timed out while reading data from the Notecard.") + raise Exception('Timed out while reading data from the ' + \ + 'Notecard.') - return read_data + if delay: + time.sleep(0.05) - @i2c_lock - def _transact(self, req, rsp_expected): - """Perform a low-level transaction with the Notecard.""" - rsp = None + return received_data - try: - transaction_timeout_secs = 30 - self._transaction_manager.start(transaction_timeout_secs) + def transmit(self, data, delay=True): + """Send `data` to the Notecard.""" + chunk_offset = 0 + data_left = len(data) + sent_in_seg = 0 - self._transmit(req) + while data_left > 0: + # Delay for 5ms. This prevents a fast host from hammering a + # slow/busy Notecard with requests. + time.sleep(.005) - if rsp_expected: - rsp = self._receive(30, 0.05, True) - finally: - self._transaction_manager.stop() + chunk_len = min(data_left, self.max) + write_data = data[chunk_offset:chunk_offset + chunk_len] + self._write(write_data) - return rsp + chunk_offset += chunk_len + data_left -= chunk_len + sent_in_seg += chunk_len + + # We delay for CARD_REQUEST_SEGMENT_DELAY_MS ms every time a full + # "segment" of data has been transmitted. + if sent_in_seg > CARD_REQUEST_SEGMENT_MAX_LEN: + sent_in_seg -= CARD_REQUEST_SEGMENT_MAX_LEN + + if delay: + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + + if delay: + time.sleep(CARD_REQUEST_I2C_CHUNK_DELAY_MS / 1000) + + def _transact(self, req_bytes, rsp_expected, + timeout_secs=CARD_INTER_TRANSACTION_TIMEOUT_SEC): + self.transmit(req_bytes) + + if not rsp_expected: + return + + # Delay for 5ms. This prevents a fast host from hammering a slow/busy + # Notecard with requests. + time.sleep(0.005) + + start = start_timeout() + available = 0 + while available == 0: + available, _ = self._read(0) + + if timeout_secs != 0 and has_timed_out(start, timeout_secs): + raise Exception('Timed out while querying Notecard for ' + \ + 'available data.') + + return self.receive() - @i2c_lock def Reset(self): """Reset the Notecard.""" - # Send a newline to the Notecard to terminate any partial request that - # might be sitting in its input buffer. - self._transmit(b'\n') + if self._debug: + print('Resetting Notecard I2C communications.') - time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + notecard_ready = False + try: + self.lock() + + for i in range(CARD_RESET_SYNC_RETRIES): + try: + # Send a newline to the Notecard to terminate any partial + # request that might be sitting in its input buffer. + self._write(b'\n') + except Exception as e: + if self._debug: + print(e) + time.sleep(CARD_REQUEST_I2C_NACK_WAIT_MS / 1000) + continue - # Read from the Notecard until there's nothing left, retrying a max of 3 - # times. - retries = 3 - while retries > 0: - try: - self._receive(0, .001, False) - except: - retries -= 1 - else: - break + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) - if retries == 0: + something_found = False + non_control_char_found = False + + start = start_timeout() + read_len = 0 + while not has_timed_out(start, CARD_RESET_DRAIN_MS / 1000): + try: + available, data = self._read(read_len) + except Exception as e: + if self._debug: + print(e) + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + continue + + if len(data) > 0: + something_found = True + # The Notecard responds to a bare `\n` with `\r\n`. If + # we get any other characters back, it means the host + # and Notecard aren't synced up yet, and we need to + # transmit `\n` again. + for byte in data: + if byte != ord('\n') and byte != ord('\r'): + non_control_char_found = True + # Reset the timer with each non-control + # character. + start = start_timeout() + + read_len = min(available, self.max) + + time.sleep(CARD_REQUEST_I2C_CHUNK_DELAY_MS / 1000) + + if not something_found: + if self._debug: + print('Notecard not responding to newline during ' + \ + 'reset.') + time.sleep(.005) + elif non_control_char_found: + if self._debug: + print('Received non-control characters from the ' + \ + 'Notecard during reset.') + else: + # If all we got back is newlines, we're in sync with the + # Notecard. + notecard_ready = True + break + + if self._debug: + print('Retrying reset...') + + # Wait CARD_RESET_DRAIN_MS before trying again. + time.sleep(CARD_RESET_DRAIN_MS / 1000) + finally: + self.unlock() + + if not notecard_ready: raise Exception('Failed to reset Notecard.') - def _linux_write(self, length, data): + self._reset_required = False + + def _cpython_write(self, length, data): # noqa: D403 + """CPython implementation of serial-over-I2C write.""" msgs = [I2C.Message(length + data)] self.i2c.transfer(self.addr, msgs) - def _non_linux_write(self, length, data): + def _non_cpython_write(self, length, data): + """Non-CPython implementation of serial-over-I2C write.""" self.i2c.writeto(self.addr, length + data) - def _linux_read(self, initiate_read_msg, read_buf): - msgs = [I2C.Message(initiate_read_msg), I2C.Message(read_buf, read=True)] + def _cpython_read(self, initiate_read_msg, read_buf): # noqa: D403 + """CPython implementation of serial-over-I2C read.""" + msgs = [ + I2C.Message(initiate_read_msg), + I2C.Message(read_buf, read=True) + ] self.i2c.transfer(self.addr, msgs) - read_buf = msgs[1].data - - return read_buf + read_bytes = msgs[1].data + read_buf[:len(read_bytes)] = read_bytes - def _micropython_read(self, initiate_read_msg, read_buf): + def _micropython_read(self, initiate_read_msg, read_buf): # noqa: D403 + """MicroPython implementation of serial-over-I2C read.""" self.i2c.writeto(self.addr, initiate_read_msg, False) self.i2c.readfrom_into(self.addr, read_buf) - return read_buf - - def _circuitpython_read(self, initiate_read_msg, read_buf): + def _circuitpython_read(self, initiate_read_msg, read_buf): # noqa: D403 + """CircuitPython implementation of serial-over-I2C read.""" self.i2c.writeto_then_readfrom(self.addr, initiate_read_msg, read_buf) - return read_buf + def lock(self): + """Lock access to the I2C bus.""" + retries = 5 + while retries != 0: + if self.lock_fn(): + break + + retries -= 1 + # Try again after 100 ms. + time.sleep(.1) + + if retries == 0: + raise Exception('Failed to acquire I2C lock.') + + def unlock(self): + """Unlock access to the I2C bus.""" + self.unlock_fn() + + def _i2c_no_op_try_lock(*args, **kwargs): + """No-op lock function.""" + return True + + def _i2c_no_op_unlock(*args, **kwargs): + """No-op unlock function.""" + pass def __init__(self, i2c, address, max_transfer, debug=False): """Initialize the Notecard before a reset.""" @@ -485,41 +856,30 @@ def __init__(self, i2c, address, max_transfer, debug=False): self.i2c = i2c - def i2c_no_op_try_lock(*args, **kwargs): - """No-op lock function.""" - return True - - def i2c_no_op_unlock(*args, **kwargs): - """No-op unlock function.""" - pass - - use_i2c_lock = not use_periphery and sys.implementation.name != 'micropython' if use_i2c_lock: - self.lock = self.i2c.try_lock - self.unlock = self.i2c.unlock + self.lock_fn = self.i2c.try_lock + self.unlock_fn = self.i2c.unlock else: - self.lock = i2c_no_op_try_lock - self.unlock = i2c_no_op_unlock + self.lock_fn = self._i2c_no_op_try_lock + self.unlock_fn = self._i2c_no_op_unlock if address == 0: self.addr = NOTECARD_I2C_ADDRESS else: self.addr = address if max_transfer == 0: - self.max = 255 + self.max = NOTECARD_I2C_MAX_TRANSFER_DEFAULT else: self.max = max_transfer - if use_periphery: - self._platform_write = self._linux_write - self._platform_read = self._linux_read - elif sys.implementation.name == 'micropython': - self._platform_write = self._non_linux_write + if sys.implementation.name == 'micropython': + self._platform_write = self._non_cpython_write self._platform_read = self._micropython_read elif sys.implementation.name == 'circuitpython': - self._platform_write = self._non_linux_write + self._platform_write = self._non_cpython_write self._platform_read = self._circuitpython_read else: - raise NotImplementedError(f'Unsupported platform: {sys.implementation.name}') + self._platform_write = self._cpython_write + self._platform_read = self._cpython_read self.Reset() diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/hitl/conftest.py b/test/hitl/conftest.py index d75dbab..3cc79b1 100644 --- a/test/hitl/conftest.py +++ b/test/hitl/conftest.py @@ -53,21 +53,19 @@ def setup_host(port, platform, mpy_board): mkdir_on_host(pyb, '/lib/notecard') copy_files_to_host(pyb, notecard_files, '/lib/notecard') - # Copy over mpy_example.py. We'll run this example code on the MicroPython - # host to 1) verify that the host is able to use note-python to communicate - # with the Notecard and 2) verify that the example isn't broken. + examples_dir = note_python_root_dir / 'examples' + example_files = [examples_dir / 'binary-mode' / 'binary_loopback_example.py'] if platform == 'circuitpython': - example_file = 'cpy_example.py' + example_files.append(examples_dir / 'notecard-basics' / 'cpy_example.py') else: - example_file = 'mpy_example.py' + example_files.append(examples_dir / 'notecard-basics' / 'mpy_example.py') if mpy_board: boards_dir = note_python_root_dir / 'mpy_board' board_file_path = boards_dir / f"{mpy_board}.py" copy_file_to_host(pyb, board_file_path, '/board.py') - examples_dir = note_python_root_dir / 'examples' - example_file_path = examples_dir / 'notecard-basics' / example_file - copy_file_to_host(pyb, example_file_path, '/example.py') + for file in example_files: + copy_file_to_host(pyb, file, f'/{file.name}') pyb.close() diff --git a/test/hitl/example_runner.py b/test/hitl/example_runner.py new file mode 100644 index 0000000..e336622 --- /dev/null +++ b/test/hitl/example_runner.py @@ -0,0 +1,23 @@ +import pyboard + + +class ExampleRunner: + def __init__(self, pyboard_port, example_file, product_uid): + self.pyboard_port = pyboard_port + self.example_module = example_file[:-3] # Remove .py suffix. + self.product_uid = product_uid + + def run(self, use_uart, assert_success=True): + pyb = pyboard.Pyboard(self.pyboard_port, 115200) + pyb.enter_raw_repl() + try: + cmd = f'from {self.example_module} import run_example; run_example("{self.product_uid}", {use_uart})' + output = pyb.exec(cmd) + output = output.decode() + finally: + pyb.exit_raw_repl() + pyb.close() + + print(output) + assert 'Example complete.' in output + return output diff --git a/test/hitl/test_basic_comms.py b/test/hitl/test_basic_comms.py index 854be15..a14bd60 100644 --- a/test/hitl/test_basic_comms.py +++ b/test/hitl/test_basic_comms.py @@ -1,24 +1,20 @@ import pyboard import pytest +from example_runner import ExampleRunner -def run_example(port, product_uid, use_uart): - pyb = pyboard.Pyboard(port, 115200) - pyb.enter_raw_repl() - try: - cmd = f'from example import run_example; run_example("{product_uid}", {use_uart})' - output = pyb.exec(cmd) - output = output.decode() - print(output) - assert 'Example complete.' in output - finally: - pyb.exit_raw_repl() - pyb.close() +def run_basic_comms_test(config, use_uart): + if config.platform == 'micropython': + example_file = 'mpy_example.py' + elif config.platform == 'circuitpython': + example_file = 'cpy_example.py' + else: + raise Exception(f'Unsupported platform: {config.platform}') + runner = ExampleRunner(config.port, example_file, config.product_uid) + runner.run(use_uart) -def test_example_i2c(pytestconfig): - run_example(pytestconfig.port, pytestconfig.product_uid, use_uart=False) - -def test_example_serial(pytestconfig): - run_example(pytestconfig.port, pytestconfig.product_uid, use_uart=True) +@pytest.mark.parametrize('use_uart', [False, True]) +def test_basic_comms(pytestconfig, use_uart): + run_basic_comms_test(pytestconfig, use_uart) diff --git a/test/hitl/test_binary.py b/test/hitl/test_binary.py new file mode 100644 index 0000000..a0a12b7 --- /dev/null +++ b/test/hitl/test_binary.py @@ -0,0 +1,10 @@ +import pyboard +import pytest +from example_runner import ExampleRunner + + +@pytest.mark.parametrize('use_uart', [False, True]) +def test_binary(pytestconfig, use_uart): + runner = ExampleRunner(pytestconfig.port, 'binary_loopback_example.py', + pytestconfig.product_uid) + runner.run(use_uart) diff --git a/test/test_binary_helpers.py b/test/test_binary_helpers.py new file mode 100644 index 0000000..3385005 --- /dev/null +++ b/test/test_binary_helpers.py @@ -0,0 +1,581 @@ +import os +import sys +import pytest +from unittest.mock import MagicMock, patch + +sys.path.insert(0, + os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import notecard # noqa: E402 +from notecard.cobs import cobs_encode # noqa: E402 +from notecard.binary_helpers import ( # noqa: E402 + binary_store_decoded_length, + binary_store_reset, + binary_store_receive, + binary_store_transmit, + _md5_hash, + BINARY_RETRIES +) + + +@pytest.fixture +def arrange_test(): + def _arrange_test(): + card = notecard.Notecard() + card.Transaction = MagicMock() + card.lock = MagicMock() + card.unlock = MagicMock() + + return card + + yield _arrange_test + + +@pytest.fixture +def arrange_rx_test(arrange_test): + def _arrange_rx_test(bad_md5=False): + card = arrange_test() + + # Set up receive. + rx_data = bytearray.fromhex('deadbeef0a') + card.receive = MagicMock(return_value=rx_data) + + # Set up Transaction. + if bad_md5: + rsp = {'status': 'abc'} + else: + # This is the MD5 of 0xdeadbeef. Note that 0x0a is omitted -- that's + # the newline that terminates the binary payload. It isn't included + # in the MD5 calculation. + rsp = {'status': '2f249230a8e7c2bf6005ccd2679259ec'} + card.Transaction.return_value = rsp + + # Don't actually do any COBS decoding. Just return the received data. + # data minus the newline. + notecard.binary_helpers.cobs_decode = MagicMock( + return_value=rx_data[:-1]) + + return card + + yield _arrange_rx_test + + +@pytest.fixture +def arrange_tx_test(arrange_test): + def _arrange_tx_test(transmit_exception=None, **kwargs): + card = arrange_test() + card.transmit = MagicMock() + + CARD_BINARY_PRE_TRANSMIT = 0 + CARD_BINARY_PUT = 1 + CARD_BINARY_POST_TRANSMIT = 2 + DONE = 3 + + class TransactionSideEffect: + '''Iterable for the Transaction method that uses a state machine for + various binary_store_transmit scenarios exercised in these unit + tests.''' + def __init__(self, pre_transmit_err=None, maximum=1024, length=0, + card_binary_put_err=None, + card_binary_put_exception=None, post_transmit_err=None, + retry_transmit_forever=False): + # 'err' field value in the pre-transmission card.binary + # response. + self.pre_transmit_err = pre_transmit_err + # 'max' field value in the pre-transmission card.binary + # response. + self.maximum = maximum + # 'length' field value in the pre-transmission card.binary + # response. + self.length = length + # 'err' field value in the card.binary.put response. + self.card_binary_put_err = card_binary_put_err + # Raise an exception on the card.binary.put request, using the + # string `card_binary_put_exception`. + self.card_binary_put_exception = card_binary_put_exception + # 'err' field value in the post-transmission card.binary + # response. + self.post_transmit_err = post_transmit_err + # If this is true, post_transmit_err will not be cleared on a + # transmission retry. This results in continuous retrying. + self.retry_transmit_forever = retry_transmit_forever + + def __iter__(self): + self.state = CARD_BINARY_PRE_TRANSMIT + + def __next__(self): + # This is the aforementioned state machine. The first + # Transaction call in binary_store_transmit is a card.binary + # request. + if self.state == CARD_BINARY_PRE_TRANSMIT: + rsp = {'length': self.length} + if self.maximum: + rsp['max'] = self.maximum + if self.pre_transmit_err: + rsp['err'] = self.pre_transmit_err + self.state = CARD_BINARY_PUT + + # The second call is card.binary.put. + elif self.state == CARD_BINARY_PUT: + if self.card_binary_put_exception: + rsp = Exception(self.card_binary_put_exception) + elif self.card_binary_put_err: + rsp = {'err': self.card_binary_put_err} + else: + rsp = {} + self.state = CARD_BINARY_POST_TRANSMIT + + # The third call is card.binary again. + elif self.state == CARD_BINARY_POST_TRANSMIT: + if self.post_transmit_err: + rsp = {'err': self.post_transmit_err} + # Unless we intend to error out on the post transmission + # card.binary continuously, clear the error here so that + # we don't hit it again on the retry. + if not self.retry_transmit_forever: + self.post_transmit_err = None + + self.state = CARD_BINARY_PUT + else: + rsp = {} + self.state = DONE + + elif self.state == DONE: + raise StopIteration + + return rsp + + card.Transaction.side_effect = TransactionSideEffect(**kwargs) + + if transmit_exception: + card.transmit.side_effect = Exception(transmit_exception) + + return card + + with patch('notecard.binary_helpers.cobs_encode', side_effect=cobs_encode): + yield _arrange_tx_test + + +@pytest.fixture +def tx_data(): + return bytearray.fromhex(('82f9a19b7e02c95bd87b38a8ce479608830ee68ffe4d1763' + 'ecc205681ed537a3')) + + +def get_card_binary_put_call(card): + '''Find the card.binary.put request by searching through the calls to + card.Transaction.''' + card_binary_put_call = None + for call in card.Transaction.call_args_list: + req = call[0][0] + if req['req'] == 'card.binary.put': + card_binary_put_call = call + break + + return card_binary_put_call + + +class TestBinaryStoreDecodedLength: + # binary_store_decoded_length tests. + def test_makes_card_binary_request(self, arrange_test): + card = arrange_test() + + binary_store_decoded_length(card) + + card.Transaction.assert_called_once_with({'req': 'card.binary'}) + + def test_returns_zero_if_no_length_field(self, arrange_test): + card = arrange_test() + card.Transaction.return_value = {} + + assert binary_store_decoded_length(card) == 0 + + def test_returns_length_length_field_present(self, arrange_test): + card = arrange_test() + length = 99 + card.Transaction.return_value = {'length': length} + + assert binary_store_decoded_length(card) == length + + def test_ignores_bad_bin_errors(self, arrange_test): + card = arrange_test() + card.Transaction.return_value = {'err': '{bad-bin}'} + + binary_store_decoded_length(card) + + def test_raises_exception_on_non_bad_bin_error(self, arrange_test): + card = arrange_test() + err_msg = 'some error' + card.Transaction.return_value = {'err': err_msg} + exception_msg = f'Error in response to card.binary request: {err_msg}.' + + with pytest.raises(Exception, match=exception_msg): + binary_store_decoded_length(card) + + +class TestBinaryStoreReset: + def test_makes_card_binary_request_with_delete_true( + self, arrange_test): + card = arrange_test() + + binary_store_reset(card) + + card.Transaction.assert_called_once_with( + {'req': 'card.binary', 'delete': True}) + + def test_raises_exception_on_error_in_response(self, arrange_test): + card = arrange_test() + err_msg = 'some error' + card.Transaction.return_value = {'err': err_msg} + exception_msg = ('Error in response to card.binary delete request: ' + f'{err_msg}.') + + with pytest.raises(Exception, match=exception_msg): + binary_store_reset(card) + + +class TestBinaryStoreReceive: + def test_maps_card_binary_get_params_correctly(self, arrange_rx_test): + card = arrange_rx_test() + offset = 11 + length = 99 + expected_req = { + 'req': 'card.binary.get', + 'offset': offset, + 'length': length + } + + binary_store_receive(card, offset, length) + + card.Transaction.assert_called_once_with(expected_req, lock=False) + + def test_locks_and_unlocks(self, arrange_rx_test): + card = arrange_rx_test() + offset = 11 + length = 99 + + binary_store_receive(card, offset, length) + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_unlocks_after_card_binary_get_exception(self, arrange_rx_test): + card = arrange_rx_test() + offset = 11 + length = 99 + exception_msg = 'Transaction failed.' + card.Transaction.side_effect = Exception(exception_msg) + + with pytest.raises(Exception, match=exception_msg): + binary_store_receive(card, offset, length) + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_unlocks_after_receive_exception(self, arrange_rx_test): + card = arrange_rx_test() + offset = 11 + length = 99 + exception_msg = 'receive failed.' + card.receive.side_effect = Exception(exception_msg) + + with pytest.raises(Exception, match=exception_msg): + binary_store_receive(card, offset, length) + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_raises_exception_if_error_in_card_binary_get_response( + self, arrange_rx_test): + card = arrange_rx_test() + offset = 11 + length = 99 + err_msg = 'some error' + card.Transaction.return_value['err'] = err_msg + + with pytest.raises(Exception, match=err_msg): + binary_store_receive(card, offset, length) + + def test_unlocks_if_error_in_card_binary_get_response( + self, arrange_rx_test): + card = arrange_rx_test() + offset = 11 + length = 99 + err_msg = 'some error' + card.Transaction.return_value['err'] = err_msg + + with pytest.raises(Exception, match=err_msg): + binary_store_receive(card, offset, length) + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_queues_reset_after_receive_exception(self, arrange_rx_test): + card = arrange_rx_test() + card._reset_required = False + offset = 11 + length = 99 + exception_msg = 'receive failed.' + card.receive.side_effect = Exception(exception_msg) + + with pytest.raises(Exception, match=exception_msg): + binary_store_receive(card, offset, length) + + assert card._reset_required + + def test_raises_exception_on_bad_md5(self, arrange_rx_test): + card = arrange_rx_test(bad_md5=True) + offset = 11 + length = 99 + exception_msg = 'Computed MD5 does not match received MD5.' + + with pytest.raises(Exception, match=exception_msg): + binary_store_receive(card, offset, length) + + def test_calls_cobs_decode_on_rx_data_without_newline( + self, arrange_rx_test): + card = arrange_rx_test() + offset = 11 + length = 99 + + binary_store_receive(card, offset, length) + + # First, verify what was received ends in a newline. + assert card.receive.return_value[-1] == ord('\n') + # Then, check that cobs_decode was called with everything but that + # newline. + assert card.receive.return_value[:-1] == \ + notecard.binary_helpers.cobs_decode.call_args[0][0] + + def test_calls_cobs_decode_with_newline_as_eop(self, arrange_rx_test): + card = arrange_rx_test() + offset = 11 + length = 99 + + binary_store_receive(card, offset, length) + + assert notecard.binary_helpers.cobs_decode.call_args[0][1] == ord('\n') + + def test_returns_cobs_decoded_data(self, arrange_rx_test): + card = arrange_rx_test() + offset = 11 + length = 99 + + data = binary_store_receive(card, offset, length) + + assert notecard.binary_helpers.cobs_decode.return_value == data + + def test_computes_md5_over_cobs_decoded_data(self, arrange_rx_test): + card = arrange_rx_test() + offset = 11 + length = 99 + md5_fn = notecard.binary_helpers._md5_hash + + with patch('notecard.binary_helpers._md5_hash', + side_effect=md5_fn) as md5_mock: + binary_store_receive(card, offset, length) + + cobs_decoded_data = \ + notecard.binary_helpers.cobs_decode.call_args[0][0] + assert md5_mock.call_args[0][0] == cobs_decoded_data + + +class TestBinaryStoreTransmit: + def test_ignores_bad_bin_err_on_initial_card_binary_request( + self, tx_data, arrange_tx_test): + offset = 0 + card = arrange_tx_test(pre_transmit_err='{bad-bin}') + + binary_store_transmit(card, tx_data, offset) + + def test_fails_on_non_bad_bin_err_on_initial_card_binary_request( + self, tx_data, arrange_tx_test): + offset = 0 + err_msg = 'some error' + card = arrange_tx_test(pre_transmit_err=err_msg) + + with pytest.raises(Exception, match=err_msg): + binary_store_transmit(card, tx_data, offset) + + @pytest.mark.parametrize('maximum', [None, 0]) + def test_fails_on_invalid_max_on_initial_card_binary_request( + self, tx_data, arrange_tx_test, maximum): + offset = 0 + card = arrange_tx_test(maximum=maximum) + exception_msg = ('Unexpected card.binary response: max is zero or not ' + 'present.') + + with pytest.raises(Exception, match=exception_msg): + binary_store_transmit(card, tx_data, offset) + + def test_fails_if_offset_not_equal_to_length_reported_by_notecard( + self, tx_data, arrange_tx_test): + offset = 1 + card = arrange_tx_test() + exception_msg = 'Notecard data length is misaligned with offset.' + + with pytest.raises(Exception, match=exception_msg): + binary_store_transmit(card, tx_data, offset) + + def test_fails_if_data_length_exceeds_available_space_on_notecard( + self, tx_data, arrange_tx_test): + offset = 5 + card = arrange_tx_test(maximum=offset, length=offset) + exception_msg = ('Data to transmit won\'t fit in the Notecard\'s binary' + ' store.') + + with pytest.raises(Exception, match=exception_msg): + binary_store_transmit(card, tx_data, offset) + + def test_calls_cobs_encode_on_tx_data(self, tx_data, arrange_tx_test): + offset = 0 + card = arrange_tx_test() + + binary_store_transmit(card, tx_data, offset) + + assert notecard.binary_helpers.cobs_encode.call_args[0][0] == tx_data + + def test_calls_cobs_encode_with_newline_as_eop( + self, tx_data, arrange_tx_test): + offset = 0 + card = arrange_tx_test() + + binary_store_transmit(card, tx_data, offset) + + assert notecard.binary_helpers.cobs_encode.call_args[0][1] == ord('\n') + + def test_maps_card_binary_put_params_correctly( + self, tx_data, arrange_tx_test): + offset = 1 + card = arrange_tx_test(length=offset) + + binary_store_transmit(card, tx_data, offset) + + card_binary_put_call = get_card_binary_put_call(card) + # If we didn't find a card.binary.put request, something's wrong. + assert card_binary_put_call is not None + + req = card_binary_put_call[0][0] + assert req['cobs'] == len(cobs_encode(tx_data, ord('\n'))) + assert req['status'] == _md5_hash(tx_data) + assert req['offset'] == offset + + def test_sends_card_binary_put_with_lock_false( + self, tx_data, arrange_tx_test): + offset = 0 + card = arrange_tx_test() + + binary_store_transmit(card, tx_data, offset) + + card_binary_put_call = get_card_binary_put_call(card) + # If we didn't find a card.binary.put request, something's wrong. + assert card_binary_put_call is not None + + assert not card_binary_put_call[1]['lock'] + + def test_calls_transmit_with_delay_false(self, tx_data, arrange_tx_test): + offset = 0 + card = arrange_tx_test() + + binary_store_transmit(card, tx_data, offset) + + assert not card.transmit.call_args[1]['delay'] + + def test_calls_transmit_with_cobs_encoded_data_plus_newline( + self, tx_data, arrange_tx_test): + expected_data = cobs_encode(bytearray(tx_data), ord('\n')) + b'\n' + offset = 0 + card = arrange_tx_test() + + binary_store_transmit(card, tx_data, offset) + + assert card.transmit.call_args[0][0] == expected_data + + def test_raises_exception_on_card_binary_put_error( + self, tx_data, arrange_tx_test): + offset = 0 + err_msg = 'some error' + card = arrange_tx_test(card_binary_put_err=err_msg) + + with pytest.raises(Exception, match=err_msg): + binary_store_transmit(card, tx_data, offset) + + def test_locks_and_unlocks(self, tx_data, arrange_tx_test): + offset = 0 + card = arrange_tx_test() + + binary_store_transmit(card, tx_data, offset) + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_unlocks_after_card_binary_put_exception( + self, tx_data, arrange_tx_test): + offset = 0 + exception_msg = 'card.binary.put failed' + card = arrange_tx_test( + card_binary_put_exception=exception_msg) + + with pytest.raises(Exception, match=exception_msg): + binary_store_transmit(card, tx_data, offset) + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_unlocks_after_card_binary_put_error( + self, tx_data, arrange_tx_test): + offset = 0 + err_msg = 'some error' + card = arrange_tx_test(card_binary_put_err=err_msg) + + with pytest.raises(Exception, match=err_msg): + binary_store_transmit(card, tx_data, offset) + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_unlocks_after_transmit_exception(self, tx_data, arrange_tx_test): + offset = 0 + exception_msg = 'transmit failed' + card = arrange_tx_test( + transmit_exception=exception_msg) + + with pytest.raises(Exception, match=exception_msg): + binary_store_transmit(card, tx_data, offset) + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_retries_on_bad_bin_error_after_transmit( + self, tx_data, arrange_tx_test): + offset = 0 + card = arrange_tx_test(post_transmit_err='{bad-bin}') + + binary_store_transmit(card, tx_data, offset) + + # transmit should've been called once and then retried once due to the + # {bad-bin} error. + assert card.transmit.call_count == 2 + + def test_fails_on_card_binary_error_after_transmission( + self, tx_data, arrange_tx_test): + offset = 0 + err_msg = 'some error' + card = arrange_tx_test(post_transmit_err=err_msg) + + with pytest.raises(Exception, match=err_msg): + binary_store_transmit(card, tx_data, offset) + + def test_fails_after_running_out_of_retries( + self, tx_data, arrange_tx_test): + offset = 0 + exception_msg = 'Failed to transmit binary data.' + card = arrange_tx_test(post_transmit_err='{bad-bin}', + retry_transmit_forever=True) + + with pytest.raises(Exception, match=exception_msg): + binary_store_transmit(card, tx_data, offset) + + # transmit should've been called once and then retried BINARY_RETRIES + # times. + assert card.transmit.call_count == BINARY_RETRIES + 1 diff --git a/test/test_cobs.py b/test/test_cobs.py new file mode 100644 index 0000000..ddf2834 --- /dev/null +++ b/test/test_cobs.py @@ -0,0 +1,134 @@ +import sys +import os +import pytest +import random + +sys.path.insert(0, + os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from notecard.cobs import cobs_encode, cobs_decode # noqa: E402 + + +@pytest.fixture +def test_data(): + data = [ + 0x42, 0x15, 0x14, 0x56, 0x4A, 0x79, 0x17, 0xB3, 0x20, + 0x7E, 0x3D, 0x61, 0x4C, 0x93, 0xA3, 0x33, 0xE9, 0x81, + 0xED, 0x37, 0xA8, 0x35, 0x4D, 0xEF, 0xDA, 0x88, 0xC5, + 0x7F, 0x6F, 0xE8, 0x34, 0x38, 0x46, 0x99, 0x9E, 0xCA, + 0x6D, 0x41, 0x85, 0x03, 0xEA, 0x8C, 0x87, 0x30, 0x68, + 0x33, 0x2D, 0x69, 0x72, 0xF6, 0xAC, 0xDA, 0x58, 0x8A, + 0x1C, 0xB6, 0x8F, 0x66, 0x14, 0x3B, 0x8E, 0xB9, 0x6B, + 0x0E, 0x47, 0xC0, 0x96, 0xFE, 0x2B, 0xE0, 0x58, 0xF4, + 0xE0, 0xB7, 0x8D, 0x9C, 0xED, 0xDE, 0x55, 0x31, 0xB6, + 0xB0, 0xAF, 0xB6, 0xBB, 0x3C, 0x3D, 0xC1, 0xFE, 0xAB, + 0xF4, 0xB9, 0xC8, 0x4C, 0xE4, 0xA1, 0x40, 0x1F, 0x82, + 0x21, 0xF5, 0x25, 0x2A, 0xCC, 0xBF, 0x43, 0xAB, 0x53, + 0x11, 0x16, 0x69, 0xDF, 0x34, 0x88, 0xC9, 0x9F, 0x7C, + 0xBD, 0x66, 0xAC, 0x59, 0x22, 0x62, 0x33, 0x1B, 0x4A, + 0xCB, 0x75, 0x2F, 0xBA, 0x10, 0x12, 0x17, 0x43, 0x35, + 0x28, 0xE1, 0x4D, 0xA2, 0xD0, 0xBF, 0xC3, 0x13, 0x2E, + 0xB2, 0x7A, 0x20, 0xAF, 0xD9, 0x9A, 0x0E, 0xBA, 0xDC, + 0x8E, 0x35, 0xD5, 0x53, 0xC7, 0xE8, 0x6B, 0xB4, 0x4F, + 0xC2, 0x97, 0x7F, 0xB5, 0x36, 0x6F, 0x5C, 0x51, 0x3A, + 0x71, 0x85, 0x35, 0x98, 0x4C, 0x66, 0xEE, 0x3E, 0x9B, + 0x3E, 0xD5, 0x66, 0xEA, 0x97, 0xA4, 0xCF, 0x96, 0xE1, + 0x26, 0x24, 0x69, 0xCD, 0x79, 0xEA, 0xD7, 0xF2, 0x70, + 0xD8, 0xD0, 0x59, 0x04, 0xFA, 0xBE, 0x96, 0xB2, 0x72, + 0x1D, 0xA6, 0xC9, 0xD6, 0x2D, 0xA3, 0x7D, 0x3F, 0x54, + 0xD2, 0x4E, 0xDE, 0x78, 0x82, 0x2C, 0x77, 0xD0, 0x33, + 0x04, 0xBD, 0x3B, 0x0F, 0xDC, 0x7A, 0x8D, 0x7A, 0xF6, + 0x1A, 0x3E, 0x09, 0xDC, 0xC1, 0x61, 0x41, 0xBC, 0x74, + 0xD9, 0xD4, 0xCA, 0x30, 0x84, 0x7D, 0x32, 0xDC, 0x10, + 0x61, 0xC1, 0x70, 0x25, 0x82, 0x85, 0xEE, 0x91, 0x8D, + 0x48, 0xCA, 0x40, 0x3F, 0x72, 0xA6, 0xC9, 0x0C, 0x02, + 0x2F, 0x2D, 0xE3, 0xD1, 0x4F, 0x04, 0x4C, 0xEA, 0x84, + 0x99, 0x19, 0xB0, 0x25, 0x3A, 0xA0, 0x9D, 0x82, 0x0E, + 0x0C, 0x33, 0x90, 0x1C, 0x98, 0x25, 0x89, 0x4D, 0xE7, + 0x1B, 0x11, 0xB1, 0x20, 0x55, 0x6C, 0xEA, 0xEC, 0xD4, + 0x19, 0x75, 0xE2, 0xA7, 0xC6, 0x71, 0x61, 0x8C, 0xB6, + 0x71, 0xC6, 0x00, 0x6F, 0x00, 0x8B, 0x7E, 0x8F, 0x7A, + 0xA1, 0xBC, 0xDE, 0x38, 0x2E, 0x22, 0x04, 0x4B, 0x55, + 0x21, 0xA0, 0xE0, 0x3E, 0x14, 0x41, 0x91, 0x33, 0x60, + 0x8B, 0xCE, 0xE4, 0x07, 0xD1, 0xE9, 0x15, 0x60, 0x5D, + 0x76, 0xDC, 0x86, 0x3E, 0xFB, 0xE6, 0x86, 0xE9, 0x69, + 0xA5, 0xC4, 0x5F, 0x62, 0x70, 0x1C, 0x8E, 0x11, 0x74, + 0xD5, 0x7C, 0x29, 0x7F, 0x0B, 0x42, 0x43, 0x4D, 0x73, + 0x73, 0x57, 0xE2, 0x2D, 0x68, 0xC0, 0x57, 0xC9, 0xED, + 0xAF, 0xF9, 0x0B, 0xFD, 0xA0, 0x93, 0x81, 0x01, 0x1C, + 0x01, 0x7A, 0xB2, 0xC2, 0x23, 0x45, 0x22, 0xCD, 0x63, + 0xAC, 0x58, 0x56, 0x0D, 0x7E, 0xFB, 0xF4, 0x27, 0xED, + 0x5B, 0x1C, 0x47, 0x76, 0xBF, 0x14, 0xE7, 0xAF, 0x15, + 0x67, 0x01, 0x02, 0x33, 0x99, 0x95, 0xD2, 0x4E, 0x3E, + 0x8D, 0xB8, 0xFD, 0x93, 0x36, 0xAF, 0x5C, 0x67, 0x41, + 0xF4, 0x17, 0xDF, 0x5C, 0xD0, 0xBC, 0xE0, 0xAA, 0x5F, + 0xD0, 0x5B, 0xBE, 0xBC, 0x02, 0x30, 0x7B, 0x84, 0xC4, + 0x92, 0x5D, 0xE4, 0x30, 0xFD, 0x66, 0x11, 0x43, 0x44, + 0x5F, 0xD7, 0x29, 0x5A, 0x80, 0x6D, 0x7F, 0x4A, 0xC0, + 0x6F, 0xC9, 0x61, 0x93, 0xFD, 0x5F, 0x37, 0xF7, 0x67, + 0x7B, 0xD4, 0x6D, 0x07, 0xE4, 0x5B, 0x3D, 0x5F, 0x89, + 0x12, 0xE7, 0x2D, 0x07, 0x28, 0x37, 0x41, 0x70, 0xD4, + 0x8F, 0x0F, 0xAA, 0xE9, 0xF6, 0x3B, 0x7D, 0x7F + ] + + return data + + +class TestCobs: + def test_encoded_data_does_not_contain_eop(self, test_data): + # The code below randomly selects 20 elements of test_data and + # overwrites them with 0x0A. The encoding process, if correct, is + # guaranteed to eliminate 0x0A from the data if 0x0A is provided as the + # EOP byte. + eop = 0x0A + for idx in random.sample(range(len(test_data)), 20): + test_data[idx] = eop + + encoded_data = cobs_encode(bytearray(test_data), eop) + + for b in encoded_data: + assert b != eop + + def test_encoding_overhead_is_consistent(self, test_data): + # Make sure our unencoded data doesn't contain the byte 0x00. + for idx, b in enumerate(test_data): + if b == 0x00: + test_data[idx] = 0x01 + + # The COBS algorithm guarantees that for spans of data not containing + # any zero bytes, an overhead byte will be added every 254 bytes. + # There's also an overhead byte that gets added to the beginning of the + # encoded data, too. + overhead = 1 + len(test_data) // 254 + eop = 0x0A + + encoded_data = cobs_encode(bytearray(test_data), eop) + + assert len(encoded_data) == (len(test_data) + overhead) + + def test_decode_returns_original_data(self, test_data): + eop = 0x0A + input_data = bytearray(test_data) + encoded_data = cobs_encode(input_data, eop) + + decoded_data = cobs_decode(encoded_data, eop) + + assert input_data == decoded_data + + def test_encode_does_not_mutate_input_data(self, test_data): + eop = 0x0A + input_data = bytearray(test_data) + original_data = input_data[:] # This slicing ensures we make a copy. + + cobs_encode(input_data, eop) + + assert input_data == original_data + + def test_decode_does_not_mutate_input_data(self, test_data): + eop = 0x0A + input_data = cobs_encode(bytearray(test_data), eop) + original_data = input_data[:] # This slicing ensures we make a copy. + + cobs_decode(input_data, eop) + + assert input_data == original_data diff --git a/test/test_i2c.py b/test/test_i2c.py new file mode 100644 index 0000000..acbd46f --- /dev/null +++ b/test/test_i2c.py @@ -0,0 +1,479 @@ +import os +import sys +import pytest +import re +from unittest.mock import MagicMock, patch +from .unit_test_utils import TrueOnNthIteration, BooleanToggle + +sys.path.insert(0, + os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import notecard # noqa: E402 + + +@pytest.fixture +def arrange_test(): + def _arrange_test(address=0, max_transfer=0, debug=False, + mock_locking=True): + # OpenI2C's __init__ will call Reset, which we don't care about + # actually doing here, so we mock Reset. + with patch('notecard.notecard.OpenI2C.Reset'): + card = notecard.OpenI2C(MagicMock(), address, max_transfer, + debug) + + if mock_locking: + card.lock = MagicMock() + card.unlock = MagicMock() + + return card + + # Mocking time.sleep makes the tests run faster because no actual sleeping + # occurs. + with patch('notecard.notecard.time.sleep'): + # Yield instead of return so that the time.sleep patch is active for the + # duration of the test. + yield _arrange_test + + +@pytest.fixture +def arrange_reset_test(arrange_test): + def _arrange_reset_test(): + card = arrange_test() + card._write = MagicMock() + card._read = MagicMock() + + return card + + yield _arrange_reset_test + + +@pytest.fixture +def arrange_transact_test(arrange_test): + def _arrange_transact_test(): + card = arrange_test() + card.transmit = MagicMock() + card.receive = MagicMock() + req_bytes = card._prepare_request({'req': 'card.version'}) + + return card, req_bytes + + yield _arrange_transact_test + + +@pytest.fixture +def arrange_read_test(arrange_test): + def _arrange_read_test(available, data_len, data): + def _platform_read_side_effect(initiate_read_msg, read_buf): + read_buf[0] = available + read_buf[1] = data_len + read_buf[2:] = data + + card = arrange_test() + card._platform_read = MagicMock( + side_effect=_platform_read_side_effect) + + return card + + yield _arrange_read_test + + +class TestI2C: + # Reset tests. + def test_reset_succeeds_on_good_notecard_response( + self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (0, b'\r\n') + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + assert not card._reset_required + + def test_reset_sends_a_newline_to_clear_stale_response( + self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (0, b'\r\n') + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card._write.assert_called_once_with(b'\n') + + def test_reset_locks_and_unlocks(self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (0, b'\r\n') + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_unlocks_after_exception(self, arrange_reset_test): + card = arrange_reset_test() + card._write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_fails_if_continually_reads_non_control_chars( + self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (1, 1, b'h') + + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + def test_reset_required_if_reset_fails(self, arrange_reset_test): + card = arrange_reset_test() + card._write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + assert card._reset_required + + # __init__ tests. + def test_init_calls_reset(self): + with patch('notecard.notecard.OpenI2C.Reset') as reset_mock: + notecard.OpenI2C(MagicMock(), 0, 0) + + reset_mock.assert_called_once() + + @pytest.mark.parametrize( + 'addr_param,expected_addr', + [ + (0, notecard.NOTECARD_I2C_ADDRESS), + (7, 7) + ] + ) + def test_init_sets_address_correctly( + self, addr_param, expected_addr, arrange_test): + card = arrange_test(address=addr_param) + + assert card.addr == expected_addr + + @pytest.mark.parametrize( + 'max_param,expected_max', + [ + (0, notecard.NOTECARD_I2C_MAX_TRANSFER_DEFAULT), + (7, 7) + ] + ) + def test_init_sets_max_transfer_correctly( + self, max_param, expected_max, arrange_test): + card = arrange_test(max_transfer=max_param) + + assert card.max == expected_max + + @pytest.mark.parametrize('debug_param', [False, True]) + def test_init_sets_debug_correctly(self, debug_param, arrange_test): + card = arrange_test(debug=debug_param) + + assert card._debug == debug_param + + @pytest.mark.parametrize('use_i2c_lock', [False, True]) + def test_init_uses_appropriate_locking_functions( + self, use_i2c_lock, arrange_test): + with patch('notecard.notecard.use_i2c_lock', new=use_i2c_lock): + card = arrange_test() + + if use_i2c_lock: + assert card.lock_fn == card.i2c.try_lock + assert card.unlock_fn == card.i2c.unlock + else: + assert card.lock_fn.__func__ == \ + notecard.OpenI2C._i2c_no_op_try_lock + assert card.unlock_fn.__func__ == \ + notecard.OpenI2C._i2c_no_op_unlock + + @pytest.mark.parametrize( + 'platform,write_method,read_method', + [ + ( + 'micropython', + notecard.OpenI2C._non_cpython_write, + notecard.OpenI2C._micropython_read + ), + ( + 'circuitpython', + notecard.OpenI2C._non_cpython_write, + notecard.OpenI2C._circuitpython_read + ), + ( + 'cpython', + notecard.OpenI2C._cpython_write, + notecard.OpenI2C._cpython_read + ), + ] + ) + def test_init_sets_platform_hooks_correctly( + self, platform, write_method, read_method, arrange_test): + with patch('notecard.notecard.sys.implementation.name', new=platform): + card = arrange_test() + + assert card._platform_write.__func__ == write_method + assert card._platform_read.__func__ == read_method + + def test_user_agent_indicates_i2c_after_init(self, arrange_test): + card = arrange_test() + userAgent = card.GetUserAgent() + + assert userAgent['req_interface'] == 'i2c' + assert userAgent['req_port'] is not None + + # receive tests. + def test_receive_returns_all_data_bytes_from_read(self, arrange_test): + card = arrange_test() + payload = b'{}\r\n' + card._read = MagicMock() + card._read.side_effect = [ + # There are 4 bytes available to read, and there are no more bytes + # to read in this packet. + (4, bytearray()), + # 0 bytes available to read after this packet. 4 coming in this + # packet, and they are {}\r\n. + (0, payload) + ] + + rx_data = card.receive() + + assert rx_data == payload + + def test_receive_keeps_reading_if_data_available_after_newline( + self, arrange_test): + card = arrange_test() + payload = b'{}\r\n' + excess_data = b'io' + card._read = MagicMock() + card._read.side_effect = [ + # There are 4 bytes available to read, and there are no more bytes + # to read in this packet. + (4, bytearray()), + # 2 bytes available to read after this packet. 4 coming in this + # packet, and they are {}\r\n. + (2, payload), + # 0 bytes after this packet. 2 coming in this packet, and they are + # io. + (0, excess_data) + ] + + rx_data = card.receive() + + assert rx_data == (payload + excess_data) + + def test_receive_raises_exception_on_timeout(self, arrange_test): + card = arrange_test() + payload = b'{}\r' + card._read = MagicMock() + card._read.side_effect = [ + # There are 3 bytes available to read, and there are no more bytes + # to read in this packet. + (3, bytearray()), + # 0 bytes available to read after this packet. 3 coming in this + # packet, and they are {}\r. The lack of a newline at the end will + # cause this test to hit the timeout. + (0, payload) + ] + + with patch('notecard.notecard.has_timed_out', return_value=True): + with pytest.raises(Exception, match=('Timed out while reading ' + 'data from the Notecard.')): + card.receive() + + # transmit tests. + def test_transmit_writes_all_data_bytes(self, arrange_test): + card = arrange_test() + # Create a bytearray to transmit. It should be larger than a single I2C + # chunk (i.e. greater than card.max), and it should not fall neatly onto + # a segment boundary. + data_len = card.max * 2 + 15 + data = bytearray(i % 256 for i in range(data_len)) + write_mock = MagicMock() + card._write = write_mock + + card.transmit(data) + + # Using the argument history of the _write mock, assemble a bytearray of + # the data passed to write. + written = bytearray() + for write_call in write_mock.call_args_list: + segment = write_call[0][0] + written += segment + + # Verify that all the data we passed to transmit was in fact passed to + # uart.write. + assert data == written + + def test_transmit_does_not_exceed_max_transfer_size(self, arrange_test): + card = arrange_test() + # Create a bytearray to transmit. It should be larger than a single + # I2C chunk (i.e. greater than card.max), and it should not fall neatly + # onto a segment boundary. + data_len = card.max * 2 + 15 + data = bytearray(i % 256 for i in range(data_len)) + write_mock = MagicMock() + card._write = write_mock + + card.transmit(data) + + for write_call in write_mock.call_args_list: + assert len(write_call[0][0]) <= card.max + + # _transact tests. + def test_transact_calls_transmit_with_req_bytes( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + card._transact(req_bytes, rsp_expected=False) + + card.transmit.assert_called_once_with(req_bytes) + + def test_transact_returns_none_if_rsp_not_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + rsp = card._transact(req_bytes, rsp_expected=False) + + assert rsp is None + + def test_transact_returns_not_none_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._read = MagicMock(return_value=(1, bytearray())) + + rsp = card._transact(req_bytes, rsp_expected=True) + + assert rsp is not None + + def test_transact_calls_receive_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._read = MagicMock(return_value=(1, bytearray())) + + card._transact(req_bytes, rsp_expected=True) + + card.receive.assert_called_once() + + def test_transact_raises_exception_on_timeout(self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._read = MagicMock(return_value=(0, bytearray())) + + # Force a timeout. + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, + match=('Timed out while querying Notecard for ' + 'available data.')): + card._transact(req_bytes, rsp_expected=True) + + # _read tests. + def test_read_sends_the_initial_read_packet_correctly( + self, arrange_read_test): + data_len = 4 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(0, data_len, data) + # To start a read from the Notecard using serial-over-I2C, the host + # should send a 0 byte followed by a byte with the requested read + # length. + expected_packet = bytearray(2) + expected_packet[0] = 0 + expected_packet[1] = data_len + + card._read(data_len) + + card._platform_read.assert_called_once() + assert card._platform_read.call_args[0][0] == expected_packet + + def test_read_sizes_read_buf_correctly(self, arrange_read_test): + data_len = 4 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(0, data_len, data) + header_len = 2 + expected_read_buffer_len = header_len + data_len + + card._read(data_len) + + card._platform_read.assert_called_once() + assert len(card._platform_read.call_args[0][1]) == \ + expected_read_buffer_len + + def test_read_parses_data_correctly(self, arrange_read_test): + available = 8 + data_len = 4 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(available, data_len, data) + + actual_available, actual_data = card._read(len(data)) + + card._platform_read.assert_called_once() + assert actual_available == available + assert actual_data == data + + def test_read_raises_exception_if_data_length_does_not_match_data( + self, arrange_read_test): + available = 8 + # The reported length is 5, but the actual length is 4. + data_len = 5 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(available, data_len, data) + + exception_msg = re.escape(('Serial-over-I2C error: reported data length' + f' ({data_len}) differs from actual data ' + f'length ({len(data)}).')) + with pytest.raises(Exception, match=exception_msg): + card._read(len(data)) + + # _write tests. + def test_write_calls_platform_write_correctly(self, arrange_test): + card = arrange_test() + card._platform_write = MagicMock() + data = bytearray([0xDE, 0xAD, 0xBE, 0xEF]) + + card._write(data) + + card._platform_write.assert_called_once_with( + bytearray([len(data)]), data) + + # lock tests. + def test_lock_calls_lock_fn(self, arrange_test): + card = arrange_test(mock_locking=False) + card.lock_fn = MagicMock(return_value=True) + + card.lock() + + card.lock_fn.assert_called() + + def test_lock_retries_lock_fn_if_needed(self, arrange_test): + card = arrange_test(mock_locking=False) + # Fails the first time and succeeds the second time. + card.lock_fn = MagicMock(side_effect=[False, True]) + + card.lock() + + assert card.lock_fn.call_count == 2 + + def test_lock_raises_exception_if_lock_fn_never_returns_true( + self, arrange_test): + card = arrange_test(mock_locking=False) + card.lock_fn = MagicMock(return_value=False) + + with pytest.raises(Exception, match='Failed to acquire I2C lock.'): + card.lock() + + # unlock tests. + def test_unlock_calls_unlock_fn(self, arrange_test): + card = arrange_test(mock_locking=False) + card.unlock_fn = MagicMock() + + card.unlock() + + card.unlock_fn.assert_called() diff --git a/test/test_notecard.py b/test/test_notecard.py index 7bcd4b5..4cdbb8d 100644 --- a/test/test_notecard.py +++ b/test/test_notecard.py @@ -1,46 +1,413 @@ import os import sys import pytest -from unittest.mock import Mock, MagicMock, patch -import periphery +from unittest.mock import MagicMock, patch import json +import re sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import notecard # noqa: E402 +from notecard.transaction_manager import TransactionManager, NoOpTransactionManager # noqa: E402 -def get_serial_and_port(): - serial = Mock() # noqa: F811 - port = serial.Serial("/dev/tty.foo", 9600) - port.read.side_effect = [b'\r', b'\n', None] - port.readline.return_value = "\r\n" - - # Patch the Reset method so that we don't actually call it during __init__. - with patch('notecard.notecard.OpenSerial.Reset'): - nCard = notecard.OpenSerial(port) - - return (nCard, port) - - -def get_i2c_and_port(): - periphery = Mock() # noqa: F811 - port = periphery.I2C("dev/i2c-foo") - port.try_lock.return_value = True - - # Patch the Reset method so that we don't actually call it during __init__. - with patch('notecard.notecard.OpenI2C.Reset'): - nCard = notecard.OpenI2C(port, 0x17, 255) - - return (nCard, port) - - -class NotecardTest: - +@pytest.fixture +def arrange_transaction_test(): + # Mocking time.sleep makes the tests run faster because no actual sleeping + # occurs. + with patch('notecard.notecard.time.sleep'): + def _arrange_transaction_test(): + card = notecard.Notecard() + card.Reset = MagicMock() + card.lock = MagicMock() + card.unlock = MagicMock() + card._transact = MagicMock(return_value=b'{}\r\n') + card._crc_error = MagicMock(return_value=False) + + return card + + # Yield instead of return so that the time.sleep patch is active for the + # duration of the test. + yield _arrange_transaction_test + + +class TestNotecard: + # _transaction_manager tests. + def test_txn_manager_is_no_op_before_pins_set(self): + card = notecard.Notecard() + + assert isinstance(card._transaction_manager, NoOpTransactionManager) + + def test_txn_manager_is_valid_after_pins_set(self): + card = notecard.Notecard() + with patch('notecard.notecard.TransactionManager', autospec=True): + card.SetTransactionPins(1, 2) + + assert isinstance(card._transaction_manager, TransactionManager) + + # _crc_add tests + def test_crc_add_adds_a_crc_field(self): + card = notecard.Notecard() + req = '{"req":"hub.status"}' + + req_string = card._crc_add(req, 0) + + req_json = json.loads(req_string) + assert 'crc' in req_json + + def test_crc_add_formats_the_crc_field_correctly(self): + card = notecard.Notecard() + req = '{"req":"hub.status"}' + seq_number = 37 + + req_string = card._crc_add(req, seq_number) + + req_json = json.loads(req_string) + # The format should be SSSS:CCCCCCCC, where S and C are hex digits + # comprising the sequence number and CRC32, respectively. + pattern = r'^[0-9A-Fa-f]{4}:[0-9A-Fa-f]{8}$' + assert re.match(pattern, req_json['crc']) + + # _crc_error tests. + @pytest.mark.parametrize('crc_supported', [False, True]) + def test_crc_error_handles_lack_of_crc_field_correctly(self, crc_supported): + card = notecard.Notecard() + card._card_supports_crc = crc_supported + rsp_bytes = b'{}\r\n' + + error = card._crc_error(rsp_bytes) + + assert error == crc_supported + + def test_crc_error_returns_error_if_sequence_number_wrong(self): + card = notecard.Notecard() + seq_number = 37 + card._last_request_seq_number = seq_number + # Sequence number should be 37 (0x25), but the response has 38 (0x26). + rsp_bytes = b'{"crc":"0026:A3A6BF43"}\r\n' + + error = card._crc_error(rsp_bytes) + + assert error + + def test_crc_error_returns_error_if_crc_wrong(self): + card = notecard.Notecard() + seq_number = 37 + card._last_request_seq_number = seq_number + # CRC should be A3A6BF43. + rsp_bytes = b'{"crc":"0025:A3A6BF44"}\r\n' + + error = card._crc_error(rsp_bytes) + + assert error + + @pytest.mark.parametrize( + 'rsp_bytes', + [ + # Without CRC, the response is {}. + b'{"crc":"0025:A3A6BF43"}\r\n', + # Without CRC, the response is {"connected": true}. This makes sure + # _crc_error handles the "," between the two fields properly. + b'{"connected": true,"crc": "0025:025A2457"}\r\n' + ] + ) + def test_crc_error_returns_no_error_if_sequence_number_and_crc_ok( + self, rsp_bytes): + card = notecard.Notecard() + seq_number = 37 + card._last_request_seq_number = seq_number + + error = card._crc_error(rsp_bytes) + + assert not error + + # Transaction tests. + def arrange_transaction_test(self): + card = notecard.Notecard() + card.Reset = MagicMock() + card.lock = MagicMock() + card.unlock = MagicMock() + card._transact = MagicMock(return_value=b'{}\r\n') + card._crc_error = MagicMock(return_value=False) + + return card + + @pytest.mark.parametrize('reset_required', [False, True]) + def test_transaction_calls_reset_if_needed( + self, arrange_transaction_test, reset_required): + card = arrange_transaction_test() + card._reset_required = reset_required + req = {"req": "hub.status"} + + card.Transaction(req) + + if reset_required: + card.Reset.assert_called_once() + else: + card.Reset.assert_not_called() + + @pytest.mark.parametrize('lock', [False, True]) + def test_transaction_handles_locking_correctly( + self, arrange_transaction_test, lock): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + card.Transaction(req, lock=lock) + + if lock: + card.lock.assert_called_once() + card.unlock.assert_called_once() + else: + card.lock.assert_not_called() + card.unlock.assert_not_called() + + @pytest.mark.parametrize('lock', [False, True]) + def test_transaction_handles_locking_after_exception_correctly( + self, arrange_transaction_test, lock): + card = arrange_transaction_test() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises(Exception, match='Failed to transact with Notecard.'): + card.Transaction(req, lock=lock) + + if lock: + card.lock.assert_called_once() + card.unlock.assert_called_once() + else: + card.lock.assert_not_called() + card.unlock.assert_not_called() + + def test_transaction_calls_txn_manager_start_and_stop( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transaction_manager = MagicMock() + req = {"req": "hub.status"} + + card.Transaction(req) + + card._transaction_manager.start.assert_called_once() + card._transaction_manager.stop.assert_called_once() + + def test_transaction_calls_txn_manager_stop_after_exception( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transaction_manager = MagicMock() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + card._transaction_manager.start.assert_called_once() + card._transaction_manager.stop.assert_called_once() + + def test_transaction_calls_reset_if_transact_fails( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._reset_required = False + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + card.Reset.assert_called() + + def test_transaction_retries_on_transact_error( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_retries_on_crc_error( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._crc_error.return_value = True + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_retries_on_failure_to_parse_json_response( + self, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + with patch('notecard.notecard.json.loads', + side_effect=Exception('json.loads failed.')): + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_retries_on_io_error_in_response( + self, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + with patch('notecard.notecard.json.loads', + return_value={'err': 'some {io} error'}): + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_does_not_retry_on_bad_bin_error_in_response( + self, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + with patch('notecard.notecard.json.loads', + return_value={'err': 'a {bad-bin} error'}): + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == 1 + + @pytest.mark.parametrize( + 'rsp_expected,return_type', + [ + (False, type(None)), + (True, dict) + ] + ) + def test_transaction_returns_proper_type( + self, rsp_expected, return_type, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock( + return_value=(req_bytes, rsp_expected)) + + rsp_json = card.Transaction(req) + + assert isinstance(rsp_json, return_type) + + def test_transaction_does_not_retry_if_transact_fails_and_no_response_expected( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock(return_value=(req_bytes, False)) + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + card._transact.assert_called_once() + + @pytest.mark.parametrize('rsp_expected', [False, True]) + def test_transaction_increments_sequence_number_on_success( + self, rsp_expected, arrange_transaction_test): + card = arrange_transaction_test() + seq_number_before = card._last_request_seq_number + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock( + return_value=(req_bytes, rsp_expected)) + + card.Transaction(req) + + seq_number_after = card._last_request_seq_number + assert seq_number_after == seq_number_before + 1 + + @pytest.mark.parametrize('rsp_expected', [False, True]) + def test_transaction_increments_sequence_number_after_exception( + self, rsp_expected, arrange_transaction_test): + card = arrange_transaction_test() + seq_number_before = card._last_request_seq_number + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock( + return_value=(req_bytes, rsp_expected)) + card._transact.side_effect = Exception('_transact failed.') + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + seq_number_after = card._last_request_seq_number + assert seq_number_after == seq_number_before + 1 + + def test_transaction_queues_up_a_reset_on_error( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._reset_required = False + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._reset_required + + # Command tests. + def test_command_returns_none(self): + card = notecard.Notecard() + card.Transaction = MagicMock() + + rsp = card.Command({'cmd': 'hub.set'}) + + # A command generates no response, by definition. + assert rsp is None + + def test_command_fails_if_given_req(self): + card = notecard.Notecard() + + # Can't issue a command with 'req', must use 'cmd'. + with pytest.raises(Exception): + card.Command({'req': 'card.sleep'}) + + # UserAgentSent tests. + def test_user_agent_not_sent_before_hub_set(self): + card = notecard.Notecard() + + assert not card.UserAgentSent() + + @pytest.mark.parametrize( + 'request_method,request_key', + [ + ('Transaction', 'req'), + ('Command', 'cmd') + ] + ) + def test_user_agent_sent_after_hub_set(self, arrange_transaction_test, + request_method, request_key): + card = arrange_transaction_test() + + req = dict() + req[request_key] = 'hub.set' + method = getattr(card, request_method) + method(req) + + assert card.UserAgentSent() + + # GetUserAgent tests. def test_get_user_agent(self): - nCard, _ = self.get_port() - userAgent = nCard.GetUserAgent() + card = notecard.Notecard() + userAgent = card.GetUserAgent() assert userAgent['agent'] == 'note-python' assert userAgent['os_name'] is not None @@ -48,153 +415,22 @@ def test_get_user_agent(self): assert userAgent['os_version'] is not None assert userAgent['os_family'] is not None - def test_transaction(self): - nCard, port = self.get_port("{\"connected\":true}\r\n") - - response = nCard.Transaction({"req": "hub.status"}) - - assert "connected" in response - assert response["connected"] is True - - @patch('notecard.notecard.TransactionManager') - def test_setting_transaction_pins(self, transaction_manager_mock): - nCard, _ = self.get_port("{\"connected\":true}\r\n") - - nCard.SetTransactionPins(1, 2) - nCard.Transaction({"req": "hub.status"}) - - # If transaction pins have been set, start and stop should be called - # once for each Transaction call. - nCard._transaction_manager.start.assert_called_once() - nCard._transaction_manager.stop.assert_called_once() - - def test_command(self): - nCard, port = self.get_port() - - response = nCard.Command({"cmd": "card.sleep"}) - - assert response is None - - def test_command_fail_if_req(self): - nCard, port = self.get_port() - - with pytest.raises(Exception, - match="Please use 'cmd' instead of 'req'"): - nCard.Command({"req": "card.sleep"}) - - def test_user_agent_sent_is_false_before_hub_set(self): - nCard, _ = self.get_port() - - assert nCard.UserAgentSent() is False - - def test_send_user_agent_in_hub_set_transaction(self): - nCard, port = self.get_port("{\"connected\":true}\r\n") - - nCard.Transaction({"req": "hub.set"}) - - assert nCard.UserAgentSent() is True - - def get_port(self, response=None): - raise NotImplementedError("subclasses must implement `get_port()`") - - -class TestNotecardMockSerial(NotecardTest): - - def get_port(self, response=None): - nCard, port = get_serial_and_port() - if response is not None: - port.readline.return_value = response - return (nCard, port) - - def test_user_agent_is_serial_when_serial_used(self): - nCard, _ = self.get_port() - userAgent = nCard.GetUserAgent() - - assert userAgent['req_interface'] == 'serial' - assert userAgent['req_port'] is not None - - def test_open_serial(self): - nCard, _ = get_serial_and_port() - - assert nCard.uart is not None - - def test_debug_mode_on_serial(self): - # Patch the Reset method so that we don't actually call it during - # __init__. - with patch('notecard.notecard.OpenSerial.Reset'): - port = MagicMock() - nCard = notecard.OpenSerial(port, debug=True) - - assert nCard._debug - - -class TestNotecardMockI2C(NotecardTest): - - def get_port(self, response=None): - nCard, port = get_i2c_and_port() - if response is not None: - chunklen = 0 - tosend = bytes(response, 'utf-8') - - def writeto_then_readfrom(addr, write, read): - nonlocal chunklen, tosend - read[0] = len(tosend) - read[1] = chunklen - read[2:2 + chunklen] = tosend[0:chunklen] - tosend = tosend[chunklen:] - chunklen = len(tosend) - - def transfer(addr, messages: periphery.I2C.Message): - if len(messages) == 2 and messages[1].read: - read = messages[1].data - writeto_then_readfrom(addr, messages[0].data, read) - - port.writeto_then_readfrom = writeto_then_readfrom - port.transfer = transfer - return (nCard, port) - - def test_open_i2c(self): - nCard, _ = get_i2c_and_port() - - assert nCard.i2c is not None - - def test_user_agent_is_i2c_when_i2c_used(self): - nCard, _ = self.get_port() - userAgent = nCard.GetUserAgent() - - assert userAgent['req_interface'] == 'i2c' - assert userAgent['req_port'] is not None - - def test_debug_mode_on_i2c(self): - periphery = Mock() # noqa: F811 - port = periphery.I2C("dev/i2c-foo") - port.try_lock.return_value = True - - nCard = notecard.OpenI2C(port, 0x17, 255, debug=True) - - assert nCard._debug - - -class MockNotecard(notecard.Notecard): - - def Reset(self): - pass - - -class TestUserAgent: - - def setUserAgentInfo(self, info=None): - nCard = MockNotecard() - orgReq = {"req": "hub.set"} - nCard.SetAppUserAgent(info) - req = json.loads(nCard._prepare_request(orgReq)) + # SetAppUserAgent tests. + def set_user_agent_info(self, info=None): + card = notecard.Notecard() + req = {"req": "hub.set"} + card.SetAppUserAgent(info) + req = json.loads(card._prepare_request(req)[0]) return req - def test_amends_hub_set_request(self): - req = self.setUserAgentInfo() + def test_set_app_user_agent_amends_hub_set_request(self): + req = self.set_user_agent_info() + assert req['body'] is not None - def test_adds_app_info(self): + def test_set_app_user_agent_adds_app_info_to_hub_set_request(self): info = {"app": "myapp"} - req = self.setUserAgentInfo(info) + + req = self.set_user_agent_info(info) + assert req['body']['app'] == 'myapp' diff --git a/test/test_serial.py b/test/test_serial.py new file mode 100644 index 0000000..198e47e --- /dev/null +++ b/test/test_serial.py @@ -0,0 +1,351 @@ +import os +import sys +import pytest +from unittest.mock import MagicMock, patch +from filelock import FileLock +from contextlib import AbstractContextManager +from .unit_test_utils import TrueOnNthIteration, BooleanToggle + +sys.path.insert(0, + os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import notecard # noqa: E402 +from notecard import NoOpSerialLock, NoOpContextManager # noqa: E402 + + +@pytest.fixture +def arrange_test(): + def _arrange_test(debug=False): + # OpenSerial's __init__ will call Reset, which we don't care about + # actually doing here, so we mock Reset. + with patch('notecard.notecard.OpenSerial.Reset'): + card = notecard.OpenSerial(MagicMock(), debug=debug) + + return card + + # Mocking time.sleep makes the tests run faster because no actual sleeping + # occurs. + with patch('notecard.notecard.time.sleep'): + # Yield instead of return so that the time.sleep patch is active for the + # duration of the test. + yield _arrange_test + + +@pytest.fixture +def arrange_reset_test(arrange_test): + def _arrange_reset_test(): + card = arrange_test() + card.lock = MagicMock() + card.unlock = MagicMock() + card.uart.write = MagicMock() + + return card + + yield _arrange_reset_test + + +@pytest.fixture +def tramsit_test_data(): + # Create a bytearray to transmit. It should be larger than a single segment, + # and it should not fall neatly onto a segment boundary. + data_len = notecard.CARD_REQUEST_SEGMENT_MAX_LEN * 2 + 15 + data = bytearray(i % 256 for i in range(data_len)) + + return data + + +@pytest.fixture +def arrange_transact_test(arrange_test): + def _arrange_transact_test(): + card = arrange_test() + card.transmit = MagicMock() + card.receive = MagicMock() + req_bytes = card._prepare_request({'req': 'card.version'}) + + return card, req_bytes + + yield _arrange_transact_test + + +class TestSerial: + # Reset tests. + def test_reset_succeeds_on_good_notecard_response(self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=[True, True, False]) + card._read_byte = MagicMock(side_effect=[b'\r', b'\n', None]) + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + assert not card._reset_required + + def test_reset_sends_a_newline_to_clear_stale_response( + self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=[True, True, False]) + card._read_byte = MagicMock(side_effect=[b'\r', b'\n', None]) + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card.uart.write.assert_called_once_with(b'\n') + + def test_reset_locks_and_unlocks(self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=[True, True, False]) + card._read_byte = MagicMock(side_effect=[b'\r', b'\n', None]) + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_unlocks_after_exception(self, arrange_reset_test): + card = arrange_reset_test() + card.uart.write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_fails_if_continually_reads_non_control_chars( + self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=BooleanToggle(True)) + card._read_byte = MagicMock(return_value=b'h') + + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + def test_reset_required_if_reset_fails(self, arrange_reset_test): + card = arrange_reset_test() + card.uart.write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + assert card._reset_required + + # __init__ tests. + @patch('notecard.notecard.OpenSerial.Reset') + def test_init_calls_reset(self, reset_mock): + notecard.OpenSerial(MagicMock()) + + reset_mock.assert_called_once() + + @pytest.mark.parametrize( + 'use_serial_lock,lock_type', + [ + (False, NoOpSerialLock), + (True, FileLock) + ] + ) + def test_init_creates_appropriate_lock_type( + self, use_serial_lock, lock_type, arrange_test): + with patch('notecard.notecard.use_serial_lock', new=use_serial_lock): + card = arrange_test() + + assert isinstance(card.lock_handle, lock_type) + + def test_init_fails_if_not_micropython_and_uart_has_no_in_waiting_attr( + self): + exception_msg = ('Serial communications with the Notecard are not ' + 'supported for this platform.') + + with patch('notecard.notecard.sys.implementation.name', new='cpython'): + with patch('notecard.notecard.OpenSerial.Reset'): + with pytest.raises(Exception, match=exception_msg): + notecard.OpenSerial(42) + + @pytest.mark.parametrize( + 'platform,available_method', + [ + ('micropython', notecard.OpenSerial._available_micropython), + ('cpython', notecard.OpenSerial._available_default), + ('circuitpython', notecard.OpenSerial._available_default), + ] + ) + def test_available_method_is_set_correctly_on_init( + self, platform, available_method, arrange_test): + with patch('notecard.notecard.sys.implementation.name', new=platform): + card = arrange_test() + + assert card._available.__func__ == available_method + + @pytest.mark.parametrize('debug', [False, True]) + def test_debug_set_correctly_on_init(self, debug, arrange_test): + card = arrange_test(debug) + + assert card._debug == debug + + def test_user_agent_indicates_serial_after_init(self, arrange_test): + card = arrange_test() + userAgent = card.GetUserAgent() + + assert userAgent['req_interface'] == 'serial' + assert userAgent['req_port'] is not None + + # _transact tests. + def test_transact_calls_transmit_with_req_bytes( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + card._transact(req_bytes, rsp_expected=False) + + card.transmit.assert_called_once_with(req_bytes) + + def test_transact_returns_none_if_rsp_not_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + rsp = card._transact(req_bytes, rsp_expected=False) + + assert rsp is None + + def test_transact_returns_not_none_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._available = MagicMock(return_value=True) + + rsp = card._transact(req_bytes, rsp_expected=True) + + assert rsp is not None + + def test_transact_calls_receive_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._available = MagicMock(return_value=True) + + card._transact(req_bytes, rsp_expected=True) + + card.receive.assert_called_once() + + def test_transact_raises_exception_on_timeout(self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._available = MagicMock(return_value=False) + + # Force a timeout. + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, + match=('Timed out while querying Notecard for ' + 'available data.')): + card._transact(req_bytes, rsp_expected=True) + + # transmit tests. + def test_transmit_writes_all_data_bytes( + self, arrange_test, tramsit_test_data): + card = arrange_test() + card.uart.write = MagicMock() + + card.transmit(tramsit_test_data, True) + + # Using the argument history of the uart.write mock, assemble a + # bytearray of the data passed to uart.write. + written = bytearray() + for write_call in card.uart.write.call_args_list: + segment = write_call[0][0] + written += segment + # Verify that all the data we passed to transmit was in fact passed to + # uart.write. + assert tramsit_test_data == written + + def test_transmit_does_not_exceed_max_segment_length( + self, arrange_test, tramsit_test_data): + card = arrange_test() + card.uart.write = MagicMock() + + card.transmit(tramsit_test_data) + + for write_call in card.uart.write.call_args_list: + segment = write_call.args[0] + assert len(segment) <= notecard.CARD_REQUEST_SEGMENT_MAX_LEN + + # receive tests. + def test_receive_raises_exception_on_timeout(self, arrange_test): + card = arrange_test() + card._available = MagicMock(return_value=False) + + # Force a timeout. + with patch('notecard.notecard.has_timed_out', + side_effect=[False, True]): + with pytest.raises(Exception, match=('Timed out waiting to receive ' + 'data from Notecard.')): + card.receive() + + def test_receive_returns_all_bytes_from_read_byte( + self, arrange_test): + card = arrange_test() + read_byte_mock = MagicMock() + read_byte_mock.side_effect = [b'{', b'}', b'\r', b'\n'] + card._read_byte = read_byte_mock + card._available = MagicMock(return_value=True) + expected_data = bytearray('{}\r\n'.encode('utf-8')) + + data = card.receive() + + # Verify that all the bytes returned by _read_byte were returned as a + # bytearray by receive. + assert data == expected_data + + # _read_byte tests. + def test_read_byte_calls_uart_read(self, arrange_test): + card = arrange_test() + card.uart.read = MagicMock() + + card._read_byte() + + card.uart.read.assert_called_once_with(1) + + # NoOpSerialLock tests. + def test_no_op_serial_lock_implements_acquire_and_release(self): + no_op_lock = NoOpSerialLock() + + assert hasattr(no_op_lock, 'acquire') + assert hasattr(no_op_lock, 'release') + + def test_no_op_serial_lock_acquire_returns_no_op_context_manager(self): + no_op_lock = NoOpSerialLock() + + assert isinstance(no_op_lock.acquire(), NoOpContextManager) + + def test_no_op_serial_lock_acquire_accepts_timeout_arg(self): + no_op_lock = NoOpSerialLock() + + no_op_lock.acquire(timeout=10) + + # NoOpContextManager tests. + def test_no_op_context_manager_is_a_context_manager(self): + manager = NoOpContextManager() + + with manager: + pass + + assert isinstance(manager, AbstractContextManager) + + # lock/unlock tests. + def test_lock_calls_acquire_on_underlying_lock(self, arrange_test): + card = arrange_test() + lock_handle_mock = MagicMock() + card.lock_handle = lock_handle_mock + + card.lock() + + lock_handle_mock.acquire.assert_called_once() + + def test_unlock_calls_release_on_underlying_lock(self, arrange_test): + card = arrange_test() + lock_handle_mock = MagicMock() + card.lock_handle = lock_handle_mock + + card.unlock() + + lock_handle_mock.release.assert_called_once() diff --git a/test/unit_test_utils.py b/test/unit_test_utils.py new file mode 100644 index 0000000..9bc8ad1 --- /dev/null +++ b/test/unit_test_utils.py @@ -0,0 +1,40 @@ +class TrueOnNthIteration: + """Iterable that returns False until Nth iteration, then it returns True.""" + + def __init__(self, n): + """Set the iteration to return True on.""" + self.n = n + + def __iter__(self): + self.current = 1 + return self + + def __next__(self): + if self.current > self.n: + raise StopIteration + elif self.current == self.n: + result = True + else: + result = False + + self.current += 1 + + return result + + +class BooleanToggle: + """Iterable that returns a toggling boolean.""" + + def __init__(self, initial_value): + """Set the initial state (i.e. False or True).""" + self.initial_value = initial_value + + def __iter__(self): + self.current = self.initial_value + return self + + def __next__(self): + result = self.current + self.current = not self.current + + return result