diff --git a/Makefile b/Makefile index be6237e..5ddaf78 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ flake8: # F403 'from module import *' used; unable to detect undefined names https://www.flake8rules.com/rules/F403.html # W503 Line break occurred before a binary operator https://www.flake8rules.com/rules/W503.html # E501 Line too long (>79 characters) https://www.flake8rules.com/rules/E501.html - ${PYTHON} -m flake8 test/ notecard/ examples/ mpy_board/ --count --ignore=E722,F401,F403,W503,E501 --show-source --statistics + ${PYTHON} -m flake8 test/ notecard/ examples/ mpy_board/ --count --ignore=E722,F401,F403,W503,E501,E502 --show-source --statistics coverage: ${RUN_VENV_ACTIVATE} diff --git a/examples/notecard-basics/cpy_example.py b/examples/notecard-basics/cpy_example.py index 5ab42ec..bf6db92 100644 --- a/examples/notecard-basics/cpy_example.py +++ b/examples/notecard-basics/cpy_example.py @@ -14,20 +14,6 @@ import busio # noqa: E402 -def NotecardExceptionInfo(exception): - """Construct a formatted Exception string. - - Args: - exception (Exception): An exception object. - - Returns: - string: a summary of the exception with line number and details. - """ - name = exception.__class__.__name__ - return sys.platform + ": " + name \ - + ": " + " ".join(map(str, exception.args)) - - def configure_notecard(card, product_uid): """Submit a simple JSON-based request to the Notecard. @@ -39,11 +25,7 @@ def configure_notecard(card, product_uid): req["product"] = product_uid req["mode"] = "continuous" - try: - card.Transaction(req) - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + card.Transaction(req) def get_temp_and_voltage(card): @@ -53,20 +35,13 @@ def get_temp_and_voltage(card): card (object): An instance of the Notecard class """ - temp = 0 - voltage = 0 - - try: - req = {"req": "card.temp"} - rsp = card.Transaction(req) - temp = rsp["value"] + req = {"req": "card.temp"} + rsp = card.Transaction(req) + temp = rsp["value"] - req = {"req": "card.voltage"} - rsp = card.Transaction(req) - voltage = rsp["value"] - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + req = {"req": "card.voltage"} + rsp = card.Transaction(req) + voltage = rsp["value"] return temp, voltage @@ -75,7 +50,8 @@ def run_example(product_uid, use_uart=True): """Connect to Notcard and run a transaction test.""" print("Opening port...") if use_uart: - port = busio.UART(board.TX, board.RX, baudrate=9600) + port = busio.UART(board.TX, board.RX, baudrate=9600, + receiver_buffer_size=128) else: port = busio.I2C(board.SCL, board.SDA) diff --git a/examples/notecard-basics/mpy_example.py b/examples/notecard-basics/mpy_example.py index ba0c68e..a1cee74 100644 --- a/examples/notecard-basics/mpy_example.py +++ b/examples/notecard-basics/mpy_example.py @@ -16,20 +16,6 @@ from machine import Pin -def NotecardExceptionInfo(exception): - """Construct a formatted Exception string. - - Args: - exception (Exception): An exception object. - - Returns: - string: a summary of the exception with line number and details. - """ - name = exception.__class__.__name__ - return sys.platform + ": " + name + ": " \ - + " ".join(map(str, exception.args)) - - def configure_notecard(card, product_uid): """Submit a simple JSON-based request to the Notecard. @@ -41,11 +27,7 @@ def configure_notecard(card, product_uid): req["product"] = product_uid req["mode"] = "continuous" - try: - card.Transaction(req) - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + card.Transaction(req) def get_temp_and_voltage(card): @@ -55,20 +37,13 @@ def get_temp_and_voltage(card): card (object): An instance of the Notecard class """ - temp = 0 - voltage = 0 - - try: - req = {"req": "card.temp"} - rsp = card.Transaction(req) - temp = rsp["value"] - - req = {"req": "card.voltage"} - rsp = card.Transaction(req) - voltage = rsp["value"] - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + req = {"req": "card.temp"} + rsp = card.Transaction(req) + temp = rsp["value"] + + req = {"req": "card.voltage"} + rsp = card.Transaction(req) + voltage = rsp["value"] return temp, voltage diff --git a/notecard/crc32.py b/notecard/crc32.py new file mode 100644 index 0000000..4a2d208 --- /dev/null +++ b/notecard/crc32.py @@ -0,0 +1,33 @@ +"""Module for computing the CRC32 of arbitrary data.""" + +crc32_lookup_table = [ + 0x00000000, 0x1DB71064, 0x3B6E20C8, 0x26D930AC, 0x76DC4190, 0x6B6B51F4, + 0x4DB26158, 0x5005713C, 0xEDB88320, 0xF00F9344, 0xD6D6A3E8, 0xCB61B38C, + 0x9B64C2B0, 0x86D3D2D4, 0xA00AE278, 0xBDBDF21C +] + + +def _logical_rshift(val, shift_amount, num_bits=32): + """Logcally right shift `val` by `shift_amount` bits. + + Logical right shift (i.e. right shift that fills with 0s instead of the + sign bit) isn't supported natively in Python. This is a simple + implementation. See: + https://realpython.com/python-bitwise-operators/#arithmetic-vs-logical-shift + """ + unsigned_val = val % (1 << num_bits) + return unsigned_val >> shift_amount + + +def crc32(data): + """Compute CRC32 of the given data. + + Small lookup-table half-byte CRC32 algorithm based on: + https://create.stephan-brumme.com/crc32/#half-byte + """ + crc = ~0 + for idx in range(len(data)): + crc = crc32_lookup_table[(crc ^ data[idx]) & 0x0F] ^ _logical_rshift(crc, 4) + crc = crc32_lookup_table[(crc ^ _logical_rshift(data[idx], 4)) & 0x0F] ^ _logical_rshift(crc, 4) + + return ~crc & 0xffffffff diff --git a/notecard/notecard.py b/notecard/notecard.py index d36f2b6..357a3e0 100644 --- a/notecard/notecard.py +++ b/notecard/notecard.py @@ -36,12 +36,12 @@ import time from .timeout import start_timeout, has_timed_out from .transaction_manager import TransactionManager, NoOpTransactionManager +from .crc32 import crc32 use_periphery = False use_serial_lock = False if sys.implementation.name == 'cpython' and (sys.platform == 'linux' or sys.platform == 'linux2'): - use_periphery = True from periphery import I2C @@ -54,19 +54,32 @@ class SerialLockTimeout(Exception): pass +use_i2c_lock = not use_periphery and sys.implementation.name != 'micropython' + NOTECARD_I2C_ADDRESS = 0x17 +NOTECARD_I2C_MAX_TRANSFER_DEFAULT = 255 -# The notecard is a real-time device that has a fixed size interrupt buffer. -# We can push data at it far, far faster than it can process it, -# therefore we push it in segments with a pause between each segment. +# The notecard is a real-time device that has a fixed size interrupt buffer. We +# can push data at it far, far faster than it can process it. Therefore, we push +# it in segments with a pause between each segment. CARD_REQUEST_SEGMENT_MAX_LEN = 250 -# "a 250ms delay is required to separate "segments", ~256 byte -# I2C transactions." See +# "a 250ms delay is required to separate "segments", ~256 byte I2C +# transactions." See # https://dev.blues.io/guides-and-tutorials/notecard-guides/serial-over-i2c-protocol/#data-write CARD_REQUEST_SEGMENT_DELAY_MS = 250 # "A 20ms delay is commonly used to separate smaller I2C transactions known as # 'chunks'". See the same document linked above. -I2C_CHUNK_DELAY_MS = 20 +CARD_REQUEST_I2C_CHUNK_DELAY_MS = 20 +# The delay, in miliseconds, to wait after receiving a NACK I2C. +CARD_REQUEST_I2C_NACK_WAIT_MS = 1000 +# The number of times to retry syncing up with the Notecard during a reset +# before giving up. +CARD_RESET_SYNC_RETRIES = 10 +# The time, in miliseconds, to drain incoming messages during a reset. +CARD_RESET_DRAIN_MS = 500 +CARD_INTER_TRANSACTION_TIMEOUT_SEC = 30 +CARD_INTRA_TRANSACTION_TIMEOUT_SEC = 1 +CARD_TRANSACTION_RETRIES = 5 class NoOpContextManager: @@ -88,51 +101,16 @@ def acquire(*args, **kwargs): """Acquire the no-op lock.""" return NoOpContextManager() - -def serial_lock(fn): - """Attempt to get a lock on the serial channel used for Notecard comms.""" - - def decorator(self, *args, **kwargs): - try: - with self.lock.acquire(timeout=5): - return fn(self, *args, **kwargs) - except SerialLockTimeout: - raise Exception('Notecard in use') - - return decorator - - -def i2c_lock(fn): - """Attempt to get a lock on the I2C bus used for Notecard comms.""" - - def decorator(self, *args, **kwargs): - retries = 5 - while retries != 0: - if self.lock(): - break - - retries -= 1 - # Try again after 100 ms. - time.sleep(.1) - - if retries == 0: - raise Exception('Failed to acquire I2C lock.') - - try: - ret = fn(self, *args, **kwargs) - finally: - self.unlock() - - return ret - - return decorator + def release(*args, **kwargs): + """Release the no-op lock.""" + pass class Notecard: """Base Notecard class.""" def __init__(self, debug=False): - """Configure user agent.""" + """Initialize the Notecard object.""" self._user_agent_app = None self._user_agent_sent = False self._user_agent = { @@ -147,6 +125,74 @@ def __init__(self, debug=False): self._user_agent['os_family'] = os.uname().machine self._transaction_manager = NoOpTransactionManager() self._debug = debug + self._last_request_seq_number = 0 + self._card_supports_crc = False + self._reset_required = True + + def _crc_add(self, req_string, seq_number): + """Add a CRC field to the request. + + The CRC field also contains a sequence number and has this format: + + "crc":"SSSS:CCCCCCCC" + + SSSS is the sequence number encoded as a string of 4 hex digits. + CCCCCCCC is the CRC32 encoded as a string of 8 hex digits. + """ + req_bytes = req_string.encode('utf-8') + crc_hex = '{:08x}'.format(crc32(req_bytes)) + seq_number_hex = '{:04x}'.format(seq_number) + crc_field = f'"crc":"{seq_number_hex}:{crc_hex}"' + req_string_w_crc = req_string[:-1] + if req_string[-2] == '{': + req_string_w_crc += f'{crc_field}' + else: + req_string_w_crc += f',{crc_field}' + req_string_w_crc += '}' + + return req_string_w_crc + + def _crc_error(self, rsp_bytes): + """Check the CRC in a Notecard response.""" + rsp_json = json.loads(rsp_bytes) + if 'crc' not in rsp_json: + # If there's not a 'crc' field in the response, it's only an error + # if the Notecard supports CRC. + return self._card_supports_crc + + self._card_supports_crc = True + + # Extract the sequence number and CRC. We do all this via string + # operations instead of decoding the JSON. In Python, numbers with long + # decimal parts (e.g. 10.11111111111111123445522123) get truncated in + # the decoding process. When re-encoded, the string representation is + # also truncated, and so the CRC will be computed over a different + # string than was originally sent, resulting in a CRC error. + seq_number, crc = rsp_json['crc'].split(':') + # Remove the 'crc' field from the response. + rsp_str = rsp_bytes.decode() + rsp_str_crc_removed = rsp_str.split('"crc":')[0] + if rsp_str_crc_removed[-1] == ',': + rsp_str_crc_removed = rsp_str_crc_removed[:-1] + '}' + else: + rsp_str_crc_removed = rsp_str_crc_removed.rstrip() + '}' + + # Compute the CRC over the response, with the 'crc' field removed. + bytes_for_crc = rsp_str_crc_removed.encode('utf-8') + computed_crc = '{:08x}'.format(crc32(bytes_for_crc)).upper() + expected_seq_number = '{:04x}'.format(self._last_request_seq_number) + + if seq_number != expected_seq_number: + if self._debug: + print('Sequence number mismatch. Expected ' + \ + f'{expected_seq_number}, received {seq_number}.') + return True + elif crc != computed_crc: + if self._debug: + print(f'CRC error. Computed {computed_crc}, received {crc}.') + return True + + return False def _prepare_request(self, req): """Prepare a request for transmission to the Notecard.""" @@ -158,34 +204,185 @@ def _prepare_request(self, req): self._user_agent_sent = True - # Serialize the JSON request to a string. - req_string = json.dumps(req) + rsp_expected = 'req' in req + + # If this is a request and not a command, add a CRC. + req_string = json.dumps(req, separators=(',', ':')) + if rsp_expected: + req_string = self._crc_add(req_string, + self._last_request_seq_number) + + # Serialize the JSON request to a string, removing any unnecessary + # whitespace. if self._debug: print(req_string) req_string += "\n" # Encode the request string as UTF-8 bytes. - return req_string.encode('utf-8') + return (req_string.encode('utf-8'), rsp_expected) + + def _transaction_timeout_seconds(self, req): + """Determine the timeout to use, in seconds, for the transaction. + + When note.add or web.* requests are used to transfer binary data, the + time to complete the transaction varies depending on the size of the + payload and network conditions. Therefore, it's possible for these + transactions to timeout prematurely. + + This method does the following: + - If the request is a `note.add`, set the timeout value to the + value of the "milliseconds" parameter, if it exists. If it + doesn't, use the "seconds" parameter. If that doesn't exist, + use the standard timeout of `CARD_INTER_TRANSACTION_TIMEOUT_SEC`. + - If the request is a `web.*`, follow the same logic, but instead + of using the standard timeout, use 90 seconds for all `web.*` + transactions. + """ + timeout_secs = CARD_INTER_TRANSACTION_TIMEOUT_SEC + if 'req' in req: + req_key = 'req' + elif 'cmd' in req: + req_key = 'cmd' + else: + raise Exception('Malformed request. Missing \'req\' or \'cmd\' ' + \ + f'field: {req}.') + + if req[req_key] == 'note.add': + if 'milliseconds' in req: + timeout_secs = req['milliseconds'] / 1000 + elif 'seconds' in req: + timeout_secs = req['seconds'] + elif 'web.' in req[req_key]: + if 'milliseconds' in req: + timeout_secs = req['milliseconds'] / 1000 + elif 'seconds' in req: + timeout_secs = req['seconds'] + else: + timeout_secs = 90 - def Command(self, req): - """Send a command to the Notecard. The Notecard response is ignored.""" - if 'cmd' not in req: - raise Exception("Please use 'cmd' instead of 'req'") + if self._debug: + print(f'Using transaction timeout of {timeout_secs} seconds.') - req_bytes = self._prepare_request(req) - self._transact(req_bytes, False) + return timeout_secs - def Transaction(self, req): - """Perform a Notecard transaction and return the result.""" - req_bytes = self._prepare_request(req) - rsp_bytes = self._transact(req_bytes, True) - rsp_json = json.loads(rsp_bytes) - if self._debug: + def Transaction(self, req, lock=True): + """Send a request to the Notecard and read back a response. + + If the request is a command (indicated by using 'cmd' in the request + instead of 'req'), don't return a response. + + The underlying transport channel (serial or I2C) is locked for the + duration of the request and response if `lock` is True. + """ + rsp_json = None + timeout_secs = self._transaction_timeout_seconds(req) + req_bytes, rsp_expected = self._prepare_request(req) + + if self._reset_required: + self.Reset() + + try: + self._transaction_manager.start(CARD_INTER_TRANSACTION_TIMEOUT_SEC) + if lock: + self.lock() + + retries_left = CARD_TRANSACTION_RETRIES + error = False + if rsp_expected: + while retries_left > 0: + try: + rsp_bytes = self._transact( + req_bytes, rsp_expected=True, + timeout_secs=timeout_secs) + except Exception as e: + if self._debug: + print(e) + + error = True + self.Reset() + retries_left -= 1 + time.sleep(0.5) + continue + + if self._crc_error(rsp_bytes): + if self._debug: + print('CRC error on response from Notecard.') + + error = True + retries_left -= 1 + time.sleep(0.5) + continue + + try: + rsp_json = json.loads(rsp_bytes) + except Exception as e: + if self._debug: + print(e) + + error = True + retries_left -= 1 + time.sleep(0.5) + continue + + if 'err' in rsp_json: + if '{io}' in rsp_json['err']: + if self._debug: + print('Response has error field indicating ' + \ + f'I/O error: {rsp_json}') + + error = True + retries_left -= 1 + time.sleep(0.5) + continue + elif '{bad-bin}' in rsp_json['err']: + if self._debug: + print('Response has error field indicating ' + \ + f'binary I/O error: {rsp_json}') + print('Not eligible for retry.') + + error = True + break + + error = False + break + else: + try: + self._transact(req_bytes, rsp_expected=False, + timeout_secs=timeout_secs) + except Exception as e: + error = True + if self._debug: + print(e) + + self._last_request_seq_number += 1 + + if error: + self._reset_required = True + raise Exception('Failed to transact with Notecard.') + + finally: + if lock: + self.unlock() + + self._transaction_manager.stop() + + if self._debug and rsp_json is not None: print(rsp_json) return rsp_json + def Command(self, req): + """Send a command to the Notecard. + + Unlike `Transaction`, `Command` doesn't return a response from the + Notecard. + """ + if 'cmd' not in req: + raise Exception("Please use 'cmd' instead of 'req'") + + self.Transaction(req) + def GetUserAgent(self): """Return the User Agent String for the host for debug purposes.""" ua_copy = self._user_agent.copy() @@ -208,96 +405,156 @@ def SetTransactionPins(self, rtx_pin, ctx_pin): class OpenSerial(Notecard): """Notecard class for Serial communication.""" - @serial_lock - def _transact(self, req, rsp_expected): - """Perform a low-level transaction with the Notecard.""" - rsp = None + def _transact(self, req_bytes, rsp_expected, + timeout_secs=CARD_INTER_TRANSACTION_TIMEOUT_SEC): + self.transmit(req_bytes) - try: - transaction_timeout_secs = 30 - self._transaction_manager.start(transaction_timeout_secs) - - seg_off = 0 - seg_left = len(req) - while seg_left > 0: - seg_len = seg_left - if seg_len > CARD_REQUEST_SEGMENT_MAX_LEN: - seg_len = CARD_REQUEST_SEGMENT_MAX_LEN - - self.uart.write(req[seg_off:seg_off + seg_len]) - seg_off += seg_len - seg_left -= seg_len - time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + if not rsp_expected: + return - if rsp_expected: - rsp = self.uart.readline() - finally: - self._transaction_manager.stop() + start = start_timeout() + while not self._available(): + if timeout_secs != 0 and has_timed_out(start, timeout_secs): + raise Exception('Timed out while querying Notecard for ' + \ + 'available data.') - return rsp + # Delay for 10 ms before checking for available data again. + time.sleep(.01) - def _read_byte_micropython(self): - """Read a single byte from the Notecard (MicroPython).""" - if not self.uart.any(): - return None - return self.uart.read(1) + return self.receive() - def _read_byte_cpython(self): - """Read a single byte from the Notecard (CPython).""" - if self.uart.in_waiting == 0: - return None - return self.uart.read(1) + def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC, + delay=True): + """Read a newline-terminated batch of data from the Notecard.""" + data = bytearray() + received_newline = False + start = start_timeout() + + while not received_newline: + while not self._available(): + if timeout_secs != 0 and has_timed_out(start, timeout_secs): + raise Exception('Timed out waiting to receive data from' + \ + ' Notecard.') + + # Sleep while awaiting the first byte (lazy). After the first + # byte, start to spin for the remaining bytes (greedy). + if delay and len(data) == 0: + time.sleep(.001) + + timeout_secs = CARD_INTRA_TRANSACTION_TIMEOUT_SEC + start = start_timeout() + byte = self._read_byte() + data.extend(byte) + received_newline = byte == b'\n' + + return data + + def transmit(self, data, delay=True): + """Send `data` to the Notecard.""" + seg_off = 0 + seg_left = len(data) + + while seg_left > 0: + seg_len = seg_left + if seg_len > CARD_REQUEST_SEGMENT_MAX_LEN: + seg_len = CARD_REQUEST_SEGMENT_MAX_LEN + + self.uart.write(data[seg_off:seg_off + seg_len]) + seg_off += seg_len + seg_left -= seg_len - def _read_byte_circuitpython(self): - """Read a single byte from the Notecard (CircuitPython).""" + if delay: + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + + def _available_micropython(self): + return self.uart.any() + + def _available_default(self): + return self.uart.in_waiting > 0 + + def _read_byte(self): + """Read a single byte from the Notecard.""" return self.uart.read(1) - @serial_lock def Reset(self): """Reset the Notecard.""" + if self._debug: + print('Resetting Notecard serial communications.') + + # Delay to give the Notecard a chance to process any segment sent prior + # to the coming reset sequence. + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + notecard_ready = False - for i in range(10): - try: - # Send a newline to the Notecard to terminate any partial - # request that might be sitting in its input buffer. - self.uart.write(b'\n') - except: - # Wait 500 ms and before trying to send the newline again. - time.sleep(.5) - continue + try: + self.lock() + + for i in range(CARD_RESET_SYNC_RETRIES): + try: + # Send a newline to the Notecard to terminate any partial + # request that might be sitting in its input buffer. + self.uart.write(b'\n') + except Exception as e: + if self._debug: + print(e) + # Wait CARD_RESET_DRAIN_MS and before trying to send the + # newline again. + time.sleep(CARD_RESET_DRAIN_MS / 1000) + continue + + something_found = False + non_control_char_found = False + # Drain serial for 500 ms. + start = start_timeout() + while not has_timed_out(start, CARD_RESET_DRAIN_MS / 1000): + while self._available(): + something_found = True + data = self._read_byte() + if data[0] != ord('\n') and data[0] != ord('\r'): + non_control_char_found = True + # Reset the timer with each non-control character. + start = start_timeout() + + # If there was no data read from the Notecard, wait 1 ms and + # try again. Keep doing this for CARD_RESET_DRAIN_MS. + time.sleep(.001) + + if not something_found: + if self._debug: + print('Notecard not responding to newline during ' + \ + 'reset.') + + elif non_control_char_found: + if self._debug: + print('Received non-control characters from the ' + \ + 'Notecard during reset.') + else: + # If all we got back is newlines, we're in sync with the + # Notecard. + notecard_ready = True + break + + if self._debug: + print('Retrying reset...') + + # Wait CARD_RESET_DRAIN_MS before trying again. + time.sleep(CARD_RESET_DRAIN_MS / 1000) + + if not notecard_ready: + raise Exception('Failed to reset Notecard.') - something_found = False - non_control_char_found = False - # Drain serial for 500 ms. - start = start_timeout() - while not has_timed_out(start, 0.5): - data = self._read_byte() - # If data was read from the Notecard, inspect what we received. - # If it isn't a \n or \r, the host and the Notecard aren't - # synced up yet, and we'll need to retransmit the \n and try - # again. - while data is not None and data != b'': - something_found = True - if data[0] != ord('\n') and data[0] != ord('\r'): - non_control_char_found = True - - data = self._read_byte() - - # If there was no data read from the Notecard, wait 1 ms and try - # again. Keep doing this for 500 ms. - time.sleep(.001) - - # If we received anything other than newlines from the Notecard, we - # aren't in sync, yet. - if something_found and not non_control_char_found: - notecard_ready = True - break + finally: + self.unlock() - # Wait 500 ms before trying again. - time.sleep(.5) + self._reset_required = False - if not notecard_ready: - raise Exception('Failed to reset Notecard.') + def lock(self): + """Lock access to the serial bus.""" + self.lock_handle.acquire(timeout=5) + + def unlock(self): + """Unlock access to the serial bus.""" + self.lock_handle.release() def __init__(self, uart_id, debug=False): """Initialize the Notecard before a reset.""" @@ -308,18 +565,19 @@ def __init__(self, uart_id, debug=False): self.uart = uart_id if use_serial_lock: - self.lock = FileLock('serial.lock') + self.lock_handle = FileLock('serial.lock') else: - self.lock = NoOpSerialLock() + self.lock_handle = NoOpSerialLock() if sys.implementation.name == 'micropython': - self._read_byte = self._read_byte_micropython - elif sys.implementation.name == 'cpython': - self._read_byte = self._read_byte_cpython - elif sys.implementation.name == 'circuitpython': - self._read_byte = self._read_byte_circuitpython + self._available = self._available_micropython else: - raise NotImplementedError(f'Unsupported platform: {sys.implementation.name}') + if hasattr(self.uart, 'in_waiting'): + self._available = self._available_default + else: + raise NotImplementedError('Serial communications with the ' + \ + 'Notecard are not supported for ' + \ + 'this platform.') self.Reset() @@ -327,36 +585,8 @@ def __init__(self, uart_id, debug=False): class OpenI2C(Notecard): """Notecard class for I2C communication.""" - def _write(self, data): - write_length = bytearray(1) - write_length[0] = len(data) - - # Send a message with the length of the incoming bytes followed - # by the bytes themselves. - self._platform_write(write_length, data) - - def _transmit(self, data): - chunk_offset = 0 - data_left = len(data) - sent_in_seg = 0 - - while data_left > 0: - chunk_len = min(data_left, self.max) - write_data = data[chunk_offset:chunk_offset + chunk_len] - - self._write(write_data) - - chunk_offset += chunk_len - data_left -= chunk_len - sent_in_seg += chunk_len - - if sent_in_seg > CARD_REQUEST_SEGMENT_MAX_LEN: - sent_in_seg -= CARD_REQUEST_SEGMENT_MAX_LEN - time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) - - time.sleep(I2C_CHUNK_DELAY_MS / 1000) - def _read(self, length): + """Perform a serial-over-I2C read.""" initiate_read = bytearray(2) # 0 indicates we are reading from the Notecard. initiate_read[0] = 0 @@ -366,116 +596,257 @@ def _read(self, length): # length accounts for the payload and the +2 is for the header. The # header sent by the Notecard has one byte to indicate the number of # bytes still available to read and a second byte to indicate the number - # of bytes coming in the current chunk. + # of bytes coming in the current packet. read_buf = bytearray(length + 2) - return self._platform_read(initiate_read, read_buf) + self._platform_read(initiate_read, read_buf) + # First two bytes are the header. + header = read_buf[0:2] + # The number of bytes still available to read after this packet. + available = header[0] + # The number of data bytes in this packet. + data_len = header[1] + # The rest is the data. + data = read_buf[2:] + + if len(data) != data_len: + raise Exception('Serial-over-I2C error: reported data length ' + \ + f'({data_len}) differs from actual data length' + \ + f' ({len(data)}).') + + return available, data - def _receive(self, timeout_secs, chunk_delay_secs, wait_for_newline): - chunk_len = 0 + def _write(self, data): + """Perform a serial-over-I2C write.""" + self._platform_write(bytearray([len(data)]), data) + + def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC, + delay=True): + """Read a newline-terminated batch of data from the Notecard.""" + read_len = 0 received_newline = False + timeout_secs = CARD_INTER_TRANSACTION_TIMEOUT_SEC start = start_timeout() - read_data = bytearray() + received_data = bytearray() while True: - read_buf = self._read(chunk_len) - - # The number of bytes still available to read. - num_bytes_available = read_buf[0] - # The number of bytes in this chunk. - num_bytes_this_chunk = read_buf[1] - if num_bytes_this_chunk > 0: - read_data += read_buf[2:2 + num_bytes_this_chunk] - received_newline = read_buf[-1] == ord('\n') - - chunk_len = min(num_bytes_available, self.max) - # Keep going if there's still byte available to read, even if + available, data = self._read(read_len) + if len(data) > 0: + received_data += data + + timeout_secs = CARD_INTRA_TRANSACTION_TIMEOUT_SEC + start = start_timeout() + + if not received_newline: + received_newline = data[-1] == ord('\n') + + read_len = min(available, self.max) + # Keep going if there are still bytes available to read, even if # we've received a newline. - if chunk_len > 0: + if available > 0: continue - # Otherwise, if there's no bytes available to read and we either + # Otherwise, if there are no bytes available to read and we either # 1) don't care about waiting for a newline or 2) do care and # received the newline, we're done. - if not wait_for_newline or received_newline: + if received_newline: break - # Delay between reading chunks. Note that as long as bytes are - # available to read (i.e. chunk_len > 0), we don't delay here, nor - # do we check the timeout below. This is intentional and mimics the - # behavior of other SDKs (e.g. note-c). - time.sleep(chunk_delay_secs) - if timeout_secs != 0 and has_timed_out(start, timeout_secs): - raise Exception("Timed out while reading data from the Notecard.") + raise Exception('Timed out while reading data from the ' + \ + 'Notecard.') - return read_data + if delay: + time.sleep(0.05) - @i2c_lock - def _transact(self, req, rsp_expected): - """Perform a low-level transaction with the Notecard.""" - rsp = None + return received_data - try: - transaction_timeout_secs = 30 - self._transaction_manager.start(transaction_timeout_secs) + def transmit(self, data, delay=True): + """Send `data` to the Notecard.""" + chunk_offset = 0 + data_left = len(data) + sent_in_seg = 0 - self._transmit(req) + while data_left > 0: + # Delay for 5ms. This prevents a fast host from hammering a + # slow/busy Notecard with requests. + time.sleep(.005) - if rsp_expected: - rsp = self._receive(30, 0.05, True) - finally: - self._transaction_manager.stop() + chunk_len = min(data_left, self.max) + write_data = data[chunk_offset:chunk_offset + chunk_len] + self._write(write_data) - return rsp + chunk_offset += chunk_len + data_left -= chunk_len + sent_in_seg += chunk_len + + # We delay for CARD_REQUEST_SEGMENT_DELAY_MS ms every time a full + # "segment" of data has been transmitted. + if sent_in_seg > CARD_REQUEST_SEGMENT_MAX_LEN: + sent_in_seg -= CARD_REQUEST_SEGMENT_MAX_LEN + + if delay: + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + + if delay: + time.sleep(CARD_REQUEST_I2C_CHUNK_DELAY_MS / 1000) + + def _transact(self, req_bytes, rsp_expected, + timeout_secs=CARD_INTER_TRANSACTION_TIMEOUT_SEC): + self.transmit(req_bytes) + + if not rsp_expected: + return + + # Delay for 5ms. This prevents a fast host from hammering a slow/busy + # Notecard with requests. + time.sleep(0.005) + + start = start_timeout() + available = 0 + while available == 0: + available, _ = self._read(0) + + if timeout_secs != 0 and has_timed_out(start, timeout_secs): + raise Exception('Timed out while querying Notecard for ' + \ + 'available data.') + + return self.receive() - @i2c_lock def Reset(self): """Reset the Notecard.""" - # Send a newline to the Notecard to terminate any partial request that - # might be sitting in its input buffer. - self._transmit(b'\n') + if self._debug: + print('Resetting Notecard I2C communications.') - time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + notecard_ready = False + try: + self.lock() + + for i in range(CARD_RESET_SYNC_RETRIES): + try: + # Send a newline to the Notecard to terminate any partial + # request that might be sitting in its input buffer. + self._write(b'\n') + except Exception as e: + if self._debug: + print(e) + time.sleep(CARD_REQUEST_I2C_NACK_WAIT_MS / 1000) + continue - # Read from the Notecard until there's nothing left, retrying a max of 3 - # times. - retries = 3 - while retries > 0: - try: - self._receive(0, .001, False) - except: - retries -= 1 - else: - break + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) - if retries == 0: + something_found = False + non_control_char_found = False + + start = start_timeout() + read_len = 0 + while not has_timed_out(start, CARD_RESET_DRAIN_MS / 1000): + try: + available, data = self._read(read_len) + except Exception as e: + if self._debug: + print(e) + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + continue + + if len(data) > 0: + something_found = True + # The Notecard responds to a bare `\n` with `\r\n`. If + # we get any other characters back, it means the host + # and Notecard aren't synced up yet, and we need to + # transmit `\n` again. + for byte in data: + if byte != ord('\n') and byte != ord('\r'): + non_control_char_found = True + # Reset the timer with each non-control + # character. + start = start_timeout() + + read_len = min(available, self.max) + + time.sleep(CARD_REQUEST_I2C_CHUNK_DELAY_MS / 1000) + + if not something_found: + if self._debug: + print('Notecard not responding to newline during ' + \ + 'reset.') + time.sleep(.005) + elif non_control_char_found: + if self._debug: + print('Received non-control characters from the ' + \ + 'Notecard during reset.') + else: + # If all we got back is newlines, we're in sync with the + # Notecard. + notecard_ready = True + break + + if self._debug: + print('Retrying reset...') + + # Wait CARD_RESET_DRAIN_MS before trying again. + time.sleep(CARD_RESET_DRAIN_MS / 1000) + finally: + self.unlock() + + if not notecard_ready: raise Exception('Failed to reset Notecard.') - def _linux_write(self, length, data): + self._reset_required = False + + def _cpython_write(self, length, data): # noqa: D403 + """CPython implementation of serial-over-I2C write.""" msgs = [I2C.Message(length + data)] self.i2c.transfer(self.addr, msgs) - def _non_linux_write(self, length, data): + def _non_cpython_write(self, length, data): + """Non-CPython implementation of serial-over-I2C write.""" self.i2c.writeto(self.addr, length + data) - def _linux_read(self, initiate_read_msg, read_buf): - msgs = [I2C.Message(initiate_read_msg), I2C.Message(read_buf, read=True)] + def _cpython_read(self, initiate_read_msg, read_buf): # noqa: D403 + """CPython implementation of serial-over-I2C read.""" + msgs = [ + I2C.Message(initiate_read_msg), + I2C.Message(read_buf, read=True) + ] self.i2c.transfer(self.addr, msgs) - read_buf = msgs[1].data - - return read_buf + read_bytes = msgs[1].data + read_buf[:len(read_bytes)] = read_bytes - def _micropython_read(self, initiate_read_msg, read_buf): + def _micropython_read(self, initiate_read_msg, read_buf): # noqa: D403 + """MicroPython implementation of serial-over-I2C read.""" self.i2c.writeto(self.addr, initiate_read_msg, False) self.i2c.readfrom_into(self.addr, read_buf) - return read_buf - - def _circuitpython_read(self, initiate_read_msg, read_buf): + def _circuitpython_read(self, initiate_read_msg, read_buf): # noqa: D403 + """CircuitPython implementation of serial-over-I2C read.""" self.i2c.writeto_then_readfrom(self.addr, initiate_read_msg, read_buf) - return read_buf + def lock(self): + """Lock access to the I2C bus.""" + retries = 5 + while retries != 0: + if self.lock_fn(): + break + + retries -= 1 + # Try again after 100 ms. + time.sleep(.1) + + if retries == 0: + raise Exception('Failed to acquire I2C lock.') + + def unlock(self): + """Unlock access to the I2C bus.""" + self.unlock_fn() + + def _i2c_no_op_try_lock(*args, **kwargs): + """No-op lock function.""" + return True + + def _i2c_no_op_unlock(*args, **kwargs): + """No-op unlock function.""" + pass def __init__(self, i2c, address, max_transfer, debug=False): """Initialize the Notecard before a reset.""" @@ -485,41 +856,30 @@ def __init__(self, i2c, address, max_transfer, debug=False): self.i2c = i2c - def i2c_no_op_try_lock(*args, **kwargs): - """No-op lock function.""" - return True - - def i2c_no_op_unlock(*args, **kwargs): - """No-op unlock function.""" - pass - - use_i2c_lock = not use_periphery and sys.implementation.name != 'micropython' if use_i2c_lock: - self.lock = self.i2c.try_lock - self.unlock = self.i2c.unlock + self.lock_fn = self.i2c.try_lock + self.unlock_fn = self.i2c.unlock else: - self.lock = i2c_no_op_try_lock - self.unlock = i2c_no_op_unlock + self.lock_fn = self._i2c_no_op_try_lock + self.unlock_fn = self._i2c_no_op_unlock if address == 0: self.addr = NOTECARD_I2C_ADDRESS else: self.addr = address if max_transfer == 0: - self.max = 255 + self.max = NOTECARD_I2C_MAX_TRANSFER_DEFAULT else: self.max = max_transfer - if use_periphery: - self._platform_write = self._linux_write - self._platform_read = self._linux_read - elif sys.implementation.name == 'micropython': - self._platform_write = self._non_linux_write + if sys.implementation.name == 'micropython': + self._platform_write = self._non_cpython_write self._platform_read = self._micropython_read elif sys.implementation.name == 'circuitpython': - self._platform_write = self._non_linux_write + self._platform_write = self._non_cpython_write self._platform_read = self._circuitpython_read else: - raise NotImplementedError(f'Unsupported platform: {sys.implementation.name}') + self._platform_write = self._cpython_write + self._platform_read = self._cpython_read self.Reset() diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/test_i2c.py b/test/test_i2c.py new file mode 100644 index 0000000..acbd46f --- /dev/null +++ b/test/test_i2c.py @@ -0,0 +1,479 @@ +import os +import sys +import pytest +import re +from unittest.mock import MagicMock, patch +from .unit_test_utils import TrueOnNthIteration, BooleanToggle + +sys.path.insert(0, + os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import notecard # noqa: E402 + + +@pytest.fixture +def arrange_test(): + def _arrange_test(address=0, max_transfer=0, debug=False, + mock_locking=True): + # OpenI2C's __init__ will call Reset, which we don't care about + # actually doing here, so we mock Reset. + with patch('notecard.notecard.OpenI2C.Reset'): + card = notecard.OpenI2C(MagicMock(), address, max_transfer, + debug) + + if mock_locking: + card.lock = MagicMock() + card.unlock = MagicMock() + + return card + + # Mocking time.sleep makes the tests run faster because no actual sleeping + # occurs. + with patch('notecard.notecard.time.sleep'): + # Yield instead of return so that the time.sleep patch is active for the + # duration of the test. + yield _arrange_test + + +@pytest.fixture +def arrange_reset_test(arrange_test): + def _arrange_reset_test(): + card = arrange_test() + card._write = MagicMock() + card._read = MagicMock() + + return card + + yield _arrange_reset_test + + +@pytest.fixture +def arrange_transact_test(arrange_test): + def _arrange_transact_test(): + card = arrange_test() + card.transmit = MagicMock() + card.receive = MagicMock() + req_bytes = card._prepare_request({'req': 'card.version'}) + + return card, req_bytes + + yield _arrange_transact_test + + +@pytest.fixture +def arrange_read_test(arrange_test): + def _arrange_read_test(available, data_len, data): + def _platform_read_side_effect(initiate_read_msg, read_buf): + read_buf[0] = available + read_buf[1] = data_len + read_buf[2:] = data + + card = arrange_test() + card._platform_read = MagicMock( + side_effect=_platform_read_side_effect) + + return card + + yield _arrange_read_test + + +class TestI2C: + # Reset tests. + def test_reset_succeeds_on_good_notecard_response( + self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (0, b'\r\n') + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + assert not card._reset_required + + def test_reset_sends_a_newline_to_clear_stale_response( + self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (0, b'\r\n') + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card._write.assert_called_once_with(b'\n') + + def test_reset_locks_and_unlocks(self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (0, b'\r\n') + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_unlocks_after_exception(self, arrange_reset_test): + card = arrange_reset_test() + card._write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_fails_if_continually_reads_non_control_chars( + self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (1, 1, b'h') + + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + def test_reset_required_if_reset_fails(self, arrange_reset_test): + card = arrange_reset_test() + card._write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + assert card._reset_required + + # __init__ tests. + def test_init_calls_reset(self): + with patch('notecard.notecard.OpenI2C.Reset') as reset_mock: + notecard.OpenI2C(MagicMock(), 0, 0) + + reset_mock.assert_called_once() + + @pytest.mark.parametrize( + 'addr_param,expected_addr', + [ + (0, notecard.NOTECARD_I2C_ADDRESS), + (7, 7) + ] + ) + def test_init_sets_address_correctly( + self, addr_param, expected_addr, arrange_test): + card = arrange_test(address=addr_param) + + assert card.addr == expected_addr + + @pytest.mark.parametrize( + 'max_param,expected_max', + [ + (0, notecard.NOTECARD_I2C_MAX_TRANSFER_DEFAULT), + (7, 7) + ] + ) + def test_init_sets_max_transfer_correctly( + self, max_param, expected_max, arrange_test): + card = arrange_test(max_transfer=max_param) + + assert card.max == expected_max + + @pytest.mark.parametrize('debug_param', [False, True]) + def test_init_sets_debug_correctly(self, debug_param, arrange_test): + card = arrange_test(debug=debug_param) + + assert card._debug == debug_param + + @pytest.mark.parametrize('use_i2c_lock', [False, True]) + def test_init_uses_appropriate_locking_functions( + self, use_i2c_lock, arrange_test): + with patch('notecard.notecard.use_i2c_lock', new=use_i2c_lock): + card = arrange_test() + + if use_i2c_lock: + assert card.lock_fn == card.i2c.try_lock + assert card.unlock_fn == card.i2c.unlock + else: + assert card.lock_fn.__func__ == \ + notecard.OpenI2C._i2c_no_op_try_lock + assert card.unlock_fn.__func__ == \ + notecard.OpenI2C._i2c_no_op_unlock + + @pytest.mark.parametrize( + 'platform,write_method,read_method', + [ + ( + 'micropython', + notecard.OpenI2C._non_cpython_write, + notecard.OpenI2C._micropython_read + ), + ( + 'circuitpython', + notecard.OpenI2C._non_cpython_write, + notecard.OpenI2C._circuitpython_read + ), + ( + 'cpython', + notecard.OpenI2C._cpython_write, + notecard.OpenI2C._cpython_read + ), + ] + ) + def test_init_sets_platform_hooks_correctly( + self, platform, write_method, read_method, arrange_test): + with patch('notecard.notecard.sys.implementation.name', new=platform): + card = arrange_test() + + assert card._platform_write.__func__ == write_method + assert card._platform_read.__func__ == read_method + + def test_user_agent_indicates_i2c_after_init(self, arrange_test): + card = arrange_test() + userAgent = card.GetUserAgent() + + assert userAgent['req_interface'] == 'i2c' + assert userAgent['req_port'] is not None + + # receive tests. + def test_receive_returns_all_data_bytes_from_read(self, arrange_test): + card = arrange_test() + payload = b'{}\r\n' + card._read = MagicMock() + card._read.side_effect = [ + # There are 4 bytes available to read, and there are no more bytes + # to read in this packet. + (4, bytearray()), + # 0 bytes available to read after this packet. 4 coming in this + # packet, and they are {}\r\n. + (0, payload) + ] + + rx_data = card.receive() + + assert rx_data == payload + + def test_receive_keeps_reading_if_data_available_after_newline( + self, arrange_test): + card = arrange_test() + payload = b'{}\r\n' + excess_data = b'io' + card._read = MagicMock() + card._read.side_effect = [ + # There are 4 bytes available to read, and there are no more bytes + # to read in this packet. + (4, bytearray()), + # 2 bytes available to read after this packet. 4 coming in this + # packet, and they are {}\r\n. + (2, payload), + # 0 bytes after this packet. 2 coming in this packet, and they are + # io. + (0, excess_data) + ] + + rx_data = card.receive() + + assert rx_data == (payload + excess_data) + + def test_receive_raises_exception_on_timeout(self, arrange_test): + card = arrange_test() + payload = b'{}\r' + card._read = MagicMock() + card._read.side_effect = [ + # There are 3 bytes available to read, and there are no more bytes + # to read in this packet. + (3, bytearray()), + # 0 bytes available to read after this packet. 3 coming in this + # packet, and they are {}\r. The lack of a newline at the end will + # cause this test to hit the timeout. + (0, payload) + ] + + with patch('notecard.notecard.has_timed_out', return_value=True): + with pytest.raises(Exception, match=('Timed out while reading ' + 'data from the Notecard.')): + card.receive() + + # transmit tests. + def test_transmit_writes_all_data_bytes(self, arrange_test): + card = arrange_test() + # Create a bytearray to transmit. It should be larger than a single I2C + # chunk (i.e. greater than card.max), and it should not fall neatly onto + # a segment boundary. + data_len = card.max * 2 + 15 + data = bytearray(i % 256 for i in range(data_len)) + write_mock = MagicMock() + card._write = write_mock + + card.transmit(data) + + # Using the argument history of the _write mock, assemble a bytearray of + # the data passed to write. + written = bytearray() + for write_call in write_mock.call_args_list: + segment = write_call[0][0] + written += segment + + # Verify that all the data we passed to transmit was in fact passed to + # uart.write. + assert data == written + + def test_transmit_does_not_exceed_max_transfer_size(self, arrange_test): + card = arrange_test() + # Create a bytearray to transmit. It should be larger than a single + # I2C chunk (i.e. greater than card.max), and it should not fall neatly + # onto a segment boundary. + data_len = card.max * 2 + 15 + data = bytearray(i % 256 for i in range(data_len)) + write_mock = MagicMock() + card._write = write_mock + + card.transmit(data) + + for write_call in write_mock.call_args_list: + assert len(write_call[0][0]) <= card.max + + # _transact tests. + def test_transact_calls_transmit_with_req_bytes( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + card._transact(req_bytes, rsp_expected=False) + + card.transmit.assert_called_once_with(req_bytes) + + def test_transact_returns_none_if_rsp_not_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + rsp = card._transact(req_bytes, rsp_expected=False) + + assert rsp is None + + def test_transact_returns_not_none_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._read = MagicMock(return_value=(1, bytearray())) + + rsp = card._transact(req_bytes, rsp_expected=True) + + assert rsp is not None + + def test_transact_calls_receive_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._read = MagicMock(return_value=(1, bytearray())) + + card._transact(req_bytes, rsp_expected=True) + + card.receive.assert_called_once() + + def test_transact_raises_exception_on_timeout(self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._read = MagicMock(return_value=(0, bytearray())) + + # Force a timeout. + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, + match=('Timed out while querying Notecard for ' + 'available data.')): + card._transact(req_bytes, rsp_expected=True) + + # _read tests. + def test_read_sends_the_initial_read_packet_correctly( + self, arrange_read_test): + data_len = 4 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(0, data_len, data) + # To start a read from the Notecard using serial-over-I2C, the host + # should send a 0 byte followed by a byte with the requested read + # length. + expected_packet = bytearray(2) + expected_packet[0] = 0 + expected_packet[1] = data_len + + card._read(data_len) + + card._platform_read.assert_called_once() + assert card._platform_read.call_args[0][0] == expected_packet + + def test_read_sizes_read_buf_correctly(self, arrange_read_test): + data_len = 4 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(0, data_len, data) + header_len = 2 + expected_read_buffer_len = header_len + data_len + + card._read(data_len) + + card._platform_read.assert_called_once() + assert len(card._platform_read.call_args[0][1]) == \ + expected_read_buffer_len + + def test_read_parses_data_correctly(self, arrange_read_test): + available = 8 + data_len = 4 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(available, data_len, data) + + actual_available, actual_data = card._read(len(data)) + + card._platform_read.assert_called_once() + assert actual_available == available + assert actual_data == data + + def test_read_raises_exception_if_data_length_does_not_match_data( + self, arrange_read_test): + available = 8 + # The reported length is 5, but the actual length is 4. + data_len = 5 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(available, data_len, data) + + exception_msg = re.escape(('Serial-over-I2C error: reported data length' + f' ({data_len}) differs from actual data ' + f'length ({len(data)}).')) + with pytest.raises(Exception, match=exception_msg): + card._read(len(data)) + + # _write tests. + def test_write_calls_platform_write_correctly(self, arrange_test): + card = arrange_test() + card._platform_write = MagicMock() + data = bytearray([0xDE, 0xAD, 0xBE, 0xEF]) + + card._write(data) + + card._platform_write.assert_called_once_with( + bytearray([len(data)]), data) + + # lock tests. + def test_lock_calls_lock_fn(self, arrange_test): + card = arrange_test(mock_locking=False) + card.lock_fn = MagicMock(return_value=True) + + card.lock() + + card.lock_fn.assert_called() + + def test_lock_retries_lock_fn_if_needed(self, arrange_test): + card = arrange_test(mock_locking=False) + # Fails the first time and succeeds the second time. + card.lock_fn = MagicMock(side_effect=[False, True]) + + card.lock() + + assert card.lock_fn.call_count == 2 + + def test_lock_raises_exception_if_lock_fn_never_returns_true( + self, arrange_test): + card = arrange_test(mock_locking=False) + card.lock_fn = MagicMock(return_value=False) + + with pytest.raises(Exception, match='Failed to acquire I2C lock.'): + card.lock() + + # unlock tests. + def test_unlock_calls_unlock_fn(self, arrange_test): + card = arrange_test(mock_locking=False) + card.unlock_fn = MagicMock() + + card.unlock() + + card.unlock_fn.assert_called() diff --git a/test/test_notecard.py b/test/test_notecard.py index 7bcd4b5..4cdbb8d 100644 --- a/test/test_notecard.py +++ b/test/test_notecard.py @@ -1,46 +1,413 @@ import os import sys import pytest -from unittest.mock import Mock, MagicMock, patch -import periphery +from unittest.mock import MagicMock, patch import json +import re sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import notecard # noqa: E402 +from notecard.transaction_manager import TransactionManager, NoOpTransactionManager # noqa: E402 -def get_serial_and_port(): - serial = Mock() # noqa: F811 - port = serial.Serial("/dev/tty.foo", 9600) - port.read.side_effect = [b'\r', b'\n', None] - port.readline.return_value = "\r\n" - - # Patch the Reset method so that we don't actually call it during __init__. - with patch('notecard.notecard.OpenSerial.Reset'): - nCard = notecard.OpenSerial(port) - - return (nCard, port) - - -def get_i2c_and_port(): - periphery = Mock() # noqa: F811 - port = periphery.I2C("dev/i2c-foo") - port.try_lock.return_value = True - - # Patch the Reset method so that we don't actually call it during __init__. - with patch('notecard.notecard.OpenI2C.Reset'): - nCard = notecard.OpenI2C(port, 0x17, 255) - - return (nCard, port) - - -class NotecardTest: - +@pytest.fixture +def arrange_transaction_test(): + # Mocking time.sleep makes the tests run faster because no actual sleeping + # occurs. + with patch('notecard.notecard.time.sleep'): + def _arrange_transaction_test(): + card = notecard.Notecard() + card.Reset = MagicMock() + card.lock = MagicMock() + card.unlock = MagicMock() + card._transact = MagicMock(return_value=b'{}\r\n') + card._crc_error = MagicMock(return_value=False) + + return card + + # Yield instead of return so that the time.sleep patch is active for the + # duration of the test. + yield _arrange_transaction_test + + +class TestNotecard: + # _transaction_manager tests. + def test_txn_manager_is_no_op_before_pins_set(self): + card = notecard.Notecard() + + assert isinstance(card._transaction_manager, NoOpTransactionManager) + + def test_txn_manager_is_valid_after_pins_set(self): + card = notecard.Notecard() + with patch('notecard.notecard.TransactionManager', autospec=True): + card.SetTransactionPins(1, 2) + + assert isinstance(card._transaction_manager, TransactionManager) + + # _crc_add tests + def test_crc_add_adds_a_crc_field(self): + card = notecard.Notecard() + req = '{"req":"hub.status"}' + + req_string = card._crc_add(req, 0) + + req_json = json.loads(req_string) + assert 'crc' in req_json + + def test_crc_add_formats_the_crc_field_correctly(self): + card = notecard.Notecard() + req = '{"req":"hub.status"}' + seq_number = 37 + + req_string = card._crc_add(req, seq_number) + + req_json = json.loads(req_string) + # The format should be SSSS:CCCCCCCC, where S and C are hex digits + # comprising the sequence number and CRC32, respectively. + pattern = r'^[0-9A-Fa-f]{4}:[0-9A-Fa-f]{8}$' + assert re.match(pattern, req_json['crc']) + + # _crc_error tests. + @pytest.mark.parametrize('crc_supported', [False, True]) + def test_crc_error_handles_lack_of_crc_field_correctly(self, crc_supported): + card = notecard.Notecard() + card._card_supports_crc = crc_supported + rsp_bytes = b'{}\r\n' + + error = card._crc_error(rsp_bytes) + + assert error == crc_supported + + def test_crc_error_returns_error_if_sequence_number_wrong(self): + card = notecard.Notecard() + seq_number = 37 + card._last_request_seq_number = seq_number + # Sequence number should be 37 (0x25), but the response has 38 (0x26). + rsp_bytes = b'{"crc":"0026:A3A6BF43"}\r\n' + + error = card._crc_error(rsp_bytes) + + assert error + + def test_crc_error_returns_error_if_crc_wrong(self): + card = notecard.Notecard() + seq_number = 37 + card._last_request_seq_number = seq_number + # CRC should be A3A6BF43. + rsp_bytes = b'{"crc":"0025:A3A6BF44"}\r\n' + + error = card._crc_error(rsp_bytes) + + assert error + + @pytest.mark.parametrize( + 'rsp_bytes', + [ + # Without CRC, the response is {}. + b'{"crc":"0025:A3A6BF43"}\r\n', + # Without CRC, the response is {"connected": true}. This makes sure + # _crc_error handles the "," between the two fields properly. + b'{"connected": true,"crc": "0025:025A2457"}\r\n' + ] + ) + def test_crc_error_returns_no_error_if_sequence_number_and_crc_ok( + self, rsp_bytes): + card = notecard.Notecard() + seq_number = 37 + card._last_request_seq_number = seq_number + + error = card._crc_error(rsp_bytes) + + assert not error + + # Transaction tests. + def arrange_transaction_test(self): + card = notecard.Notecard() + card.Reset = MagicMock() + card.lock = MagicMock() + card.unlock = MagicMock() + card._transact = MagicMock(return_value=b'{}\r\n') + card._crc_error = MagicMock(return_value=False) + + return card + + @pytest.mark.parametrize('reset_required', [False, True]) + def test_transaction_calls_reset_if_needed( + self, arrange_transaction_test, reset_required): + card = arrange_transaction_test() + card._reset_required = reset_required + req = {"req": "hub.status"} + + card.Transaction(req) + + if reset_required: + card.Reset.assert_called_once() + else: + card.Reset.assert_not_called() + + @pytest.mark.parametrize('lock', [False, True]) + def test_transaction_handles_locking_correctly( + self, arrange_transaction_test, lock): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + card.Transaction(req, lock=lock) + + if lock: + card.lock.assert_called_once() + card.unlock.assert_called_once() + else: + card.lock.assert_not_called() + card.unlock.assert_not_called() + + @pytest.mark.parametrize('lock', [False, True]) + def test_transaction_handles_locking_after_exception_correctly( + self, arrange_transaction_test, lock): + card = arrange_transaction_test() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises(Exception, match='Failed to transact with Notecard.'): + card.Transaction(req, lock=lock) + + if lock: + card.lock.assert_called_once() + card.unlock.assert_called_once() + else: + card.lock.assert_not_called() + card.unlock.assert_not_called() + + def test_transaction_calls_txn_manager_start_and_stop( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transaction_manager = MagicMock() + req = {"req": "hub.status"} + + card.Transaction(req) + + card._transaction_manager.start.assert_called_once() + card._transaction_manager.stop.assert_called_once() + + def test_transaction_calls_txn_manager_stop_after_exception( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transaction_manager = MagicMock() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + card._transaction_manager.start.assert_called_once() + card._transaction_manager.stop.assert_called_once() + + def test_transaction_calls_reset_if_transact_fails( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._reset_required = False + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + card.Reset.assert_called() + + def test_transaction_retries_on_transact_error( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_retries_on_crc_error( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._crc_error.return_value = True + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_retries_on_failure_to_parse_json_response( + self, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + with patch('notecard.notecard.json.loads', + side_effect=Exception('json.loads failed.')): + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_retries_on_io_error_in_response( + self, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + with patch('notecard.notecard.json.loads', + return_value={'err': 'some {io} error'}): + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_does_not_retry_on_bad_bin_error_in_response( + self, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + with patch('notecard.notecard.json.loads', + return_value={'err': 'a {bad-bin} error'}): + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == 1 + + @pytest.mark.parametrize( + 'rsp_expected,return_type', + [ + (False, type(None)), + (True, dict) + ] + ) + def test_transaction_returns_proper_type( + self, rsp_expected, return_type, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock( + return_value=(req_bytes, rsp_expected)) + + rsp_json = card.Transaction(req) + + assert isinstance(rsp_json, return_type) + + def test_transaction_does_not_retry_if_transact_fails_and_no_response_expected( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock(return_value=(req_bytes, False)) + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + card._transact.assert_called_once() + + @pytest.mark.parametrize('rsp_expected', [False, True]) + def test_transaction_increments_sequence_number_on_success( + self, rsp_expected, arrange_transaction_test): + card = arrange_transaction_test() + seq_number_before = card._last_request_seq_number + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock( + return_value=(req_bytes, rsp_expected)) + + card.Transaction(req) + + seq_number_after = card._last_request_seq_number + assert seq_number_after == seq_number_before + 1 + + @pytest.mark.parametrize('rsp_expected', [False, True]) + def test_transaction_increments_sequence_number_after_exception( + self, rsp_expected, arrange_transaction_test): + card = arrange_transaction_test() + seq_number_before = card._last_request_seq_number + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock( + return_value=(req_bytes, rsp_expected)) + card._transact.side_effect = Exception('_transact failed.') + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + seq_number_after = card._last_request_seq_number + assert seq_number_after == seq_number_before + 1 + + def test_transaction_queues_up_a_reset_on_error( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._reset_required = False + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._reset_required + + # Command tests. + def test_command_returns_none(self): + card = notecard.Notecard() + card.Transaction = MagicMock() + + rsp = card.Command({'cmd': 'hub.set'}) + + # A command generates no response, by definition. + assert rsp is None + + def test_command_fails_if_given_req(self): + card = notecard.Notecard() + + # Can't issue a command with 'req', must use 'cmd'. + with pytest.raises(Exception): + card.Command({'req': 'card.sleep'}) + + # UserAgentSent tests. + def test_user_agent_not_sent_before_hub_set(self): + card = notecard.Notecard() + + assert not card.UserAgentSent() + + @pytest.mark.parametrize( + 'request_method,request_key', + [ + ('Transaction', 'req'), + ('Command', 'cmd') + ] + ) + def test_user_agent_sent_after_hub_set(self, arrange_transaction_test, + request_method, request_key): + card = arrange_transaction_test() + + req = dict() + req[request_key] = 'hub.set' + method = getattr(card, request_method) + method(req) + + assert card.UserAgentSent() + + # GetUserAgent tests. def test_get_user_agent(self): - nCard, _ = self.get_port() - userAgent = nCard.GetUserAgent() + card = notecard.Notecard() + userAgent = card.GetUserAgent() assert userAgent['agent'] == 'note-python' assert userAgent['os_name'] is not None @@ -48,153 +415,22 @@ def test_get_user_agent(self): assert userAgent['os_version'] is not None assert userAgent['os_family'] is not None - def test_transaction(self): - nCard, port = self.get_port("{\"connected\":true}\r\n") - - response = nCard.Transaction({"req": "hub.status"}) - - assert "connected" in response - assert response["connected"] is True - - @patch('notecard.notecard.TransactionManager') - def test_setting_transaction_pins(self, transaction_manager_mock): - nCard, _ = self.get_port("{\"connected\":true}\r\n") - - nCard.SetTransactionPins(1, 2) - nCard.Transaction({"req": "hub.status"}) - - # If transaction pins have been set, start and stop should be called - # once for each Transaction call. - nCard._transaction_manager.start.assert_called_once() - nCard._transaction_manager.stop.assert_called_once() - - def test_command(self): - nCard, port = self.get_port() - - response = nCard.Command({"cmd": "card.sleep"}) - - assert response is None - - def test_command_fail_if_req(self): - nCard, port = self.get_port() - - with pytest.raises(Exception, - match="Please use 'cmd' instead of 'req'"): - nCard.Command({"req": "card.sleep"}) - - def test_user_agent_sent_is_false_before_hub_set(self): - nCard, _ = self.get_port() - - assert nCard.UserAgentSent() is False - - def test_send_user_agent_in_hub_set_transaction(self): - nCard, port = self.get_port("{\"connected\":true}\r\n") - - nCard.Transaction({"req": "hub.set"}) - - assert nCard.UserAgentSent() is True - - def get_port(self, response=None): - raise NotImplementedError("subclasses must implement `get_port()`") - - -class TestNotecardMockSerial(NotecardTest): - - def get_port(self, response=None): - nCard, port = get_serial_and_port() - if response is not None: - port.readline.return_value = response - return (nCard, port) - - def test_user_agent_is_serial_when_serial_used(self): - nCard, _ = self.get_port() - userAgent = nCard.GetUserAgent() - - assert userAgent['req_interface'] == 'serial' - assert userAgent['req_port'] is not None - - def test_open_serial(self): - nCard, _ = get_serial_and_port() - - assert nCard.uart is not None - - def test_debug_mode_on_serial(self): - # Patch the Reset method so that we don't actually call it during - # __init__. - with patch('notecard.notecard.OpenSerial.Reset'): - port = MagicMock() - nCard = notecard.OpenSerial(port, debug=True) - - assert nCard._debug - - -class TestNotecardMockI2C(NotecardTest): - - def get_port(self, response=None): - nCard, port = get_i2c_and_port() - if response is not None: - chunklen = 0 - tosend = bytes(response, 'utf-8') - - def writeto_then_readfrom(addr, write, read): - nonlocal chunklen, tosend - read[0] = len(tosend) - read[1] = chunklen - read[2:2 + chunklen] = tosend[0:chunklen] - tosend = tosend[chunklen:] - chunklen = len(tosend) - - def transfer(addr, messages: periphery.I2C.Message): - if len(messages) == 2 and messages[1].read: - read = messages[1].data - writeto_then_readfrom(addr, messages[0].data, read) - - port.writeto_then_readfrom = writeto_then_readfrom - port.transfer = transfer - return (nCard, port) - - def test_open_i2c(self): - nCard, _ = get_i2c_and_port() - - assert nCard.i2c is not None - - def test_user_agent_is_i2c_when_i2c_used(self): - nCard, _ = self.get_port() - userAgent = nCard.GetUserAgent() - - assert userAgent['req_interface'] == 'i2c' - assert userAgent['req_port'] is not None - - def test_debug_mode_on_i2c(self): - periphery = Mock() # noqa: F811 - port = periphery.I2C("dev/i2c-foo") - port.try_lock.return_value = True - - nCard = notecard.OpenI2C(port, 0x17, 255, debug=True) - - assert nCard._debug - - -class MockNotecard(notecard.Notecard): - - def Reset(self): - pass - - -class TestUserAgent: - - def setUserAgentInfo(self, info=None): - nCard = MockNotecard() - orgReq = {"req": "hub.set"} - nCard.SetAppUserAgent(info) - req = json.loads(nCard._prepare_request(orgReq)) + # SetAppUserAgent tests. + def set_user_agent_info(self, info=None): + card = notecard.Notecard() + req = {"req": "hub.set"} + card.SetAppUserAgent(info) + req = json.loads(card._prepare_request(req)[0]) return req - def test_amends_hub_set_request(self): - req = self.setUserAgentInfo() + def test_set_app_user_agent_amends_hub_set_request(self): + req = self.set_user_agent_info() + assert req['body'] is not None - def test_adds_app_info(self): + def test_set_app_user_agent_adds_app_info_to_hub_set_request(self): info = {"app": "myapp"} - req = self.setUserAgentInfo(info) + + req = self.set_user_agent_info(info) + assert req['body']['app'] == 'myapp' diff --git a/test/test_serial.py b/test/test_serial.py new file mode 100644 index 0000000..198e47e --- /dev/null +++ b/test/test_serial.py @@ -0,0 +1,351 @@ +import os +import sys +import pytest +from unittest.mock import MagicMock, patch +from filelock import FileLock +from contextlib import AbstractContextManager +from .unit_test_utils import TrueOnNthIteration, BooleanToggle + +sys.path.insert(0, + os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import notecard # noqa: E402 +from notecard import NoOpSerialLock, NoOpContextManager # noqa: E402 + + +@pytest.fixture +def arrange_test(): + def _arrange_test(debug=False): + # OpenSerial's __init__ will call Reset, which we don't care about + # actually doing here, so we mock Reset. + with patch('notecard.notecard.OpenSerial.Reset'): + card = notecard.OpenSerial(MagicMock(), debug=debug) + + return card + + # Mocking time.sleep makes the tests run faster because no actual sleeping + # occurs. + with patch('notecard.notecard.time.sleep'): + # Yield instead of return so that the time.sleep patch is active for the + # duration of the test. + yield _arrange_test + + +@pytest.fixture +def arrange_reset_test(arrange_test): + def _arrange_reset_test(): + card = arrange_test() + card.lock = MagicMock() + card.unlock = MagicMock() + card.uart.write = MagicMock() + + return card + + yield _arrange_reset_test + + +@pytest.fixture +def tramsit_test_data(): + # Create a bytearray to transmit. It should be larger than a single segment, + # and it should not fall neatly onto a segment boundary. + data_len = notecard.CARD_REQUEST_SEGMENT_MAX_LEN * 2 + 15 + data = bytearray(i % 256 for i in range(data_len)) + + return data + + +@pytest.fixture +def arrange_transact_test(arrange_test): + def _arrange_transact_test(): + card = arrange_test() + card.transmit = MagicMock() + card.receive = MagicMock() + req_bytes = card._prepare_request({'req': 'card.version'}) + + return card, req_bytes + + yield _arrange_transact_test + + +class TestSerial: + # Reset tests. + def test_reset_succeeds_on_good_notecard_response(self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=[True, True, False]) + card._read_byte = MagicMock(side_effect=[b'\r', b'\n', None]) + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + assert not card._reset_required + + def test_reset_sends_a_newline_to_clear_stale_response( + self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=[True, True, False]) + card._read_byte = MagicMock(side_effect=[b'\r', b'\n', None]) + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card.uart.write.assert_called_once_with(b'\n') + + def test_reset_locks_and_unlocks(self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=[True, True, False]) + card._read_byte = MagicMock(side_effect=[b'\r', b'\n', None]) + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_unlocks_after_exception(self, arrange_reset_test): + card = arrange_reset_test() + card.uart.write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_fails_if_continually_reads_non_control_chars( + self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=BooleanToggle(True)) + card._read_byte = MagicMock(return_value=b'h') + + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + def test_reset_required_if_reset_fails(self, arrange_reset_test): + card = arrange_reset_test() + card.uart.write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + assert card._reset_required + + # __init__ tests. + @patch('notecard.notecard.OpenSerial.Reset') + def test_init_calls_reset(self, reset_mock): + notecard.OpenSerial(MagicMock()) + + reset_mock.assert_called_once() + + @pytest.mark.parametrize( + 'use_serial_lock,lock_type', + [ + (False, NoOpSerialLock), + (True, FileLock) + ] + ) + def test_init_creates_appropriate_lock_type( + self, use_serial_lock, lock_type, arrange_test): + with patch('notecard.notecard.use_serial_lock', new=use_serial_lock): + card = arrange_test() + + assert isinstance(card.lock_handle, lock_type) + + def test_init_fails_if_not_micropython_and_uart_has_no_in_waiting_attr( + self): + exception_msg = ('Serial communications with the Notecard are not ' + 'supported for this platform.') + + with patch('notecard.notecard.sys.implementation.name', new='cpython'): + with patch('notecard.notecard.OpenSerial.Reset'): + with pytest.raises(Exception, match=exception_msg): + notecard.OpenSerial(42) + + @pytest.mark.parametrize( + 'platform,available_method', + [ + ('micropython', notecard.OpenSerial._available_micropython), + ('cpython', notecard.OpenSerial._available_default), + ('circuitpython', notecard.OpenSerial._available_default), + ] + ) + def test_available_method_is_set_correctly_on_init( + self, platform, available_method, arrange_test): + with patch('notecard.notecard.sys.implementation.name', new=platform): + card = arrange_test() + + assert card._available.__func__ == available_method + + @pytest.mark.parametrize('debug', [False, True]) + def test_debug_set_correctly_on_init(self, debug, arrange_test): + card = arrange_test(debug) + + assert card._debug == debug + + def test_user_agent_indicates_serial_after_init(self, arrange_test): + card = arrange_test() + userAgent = card.GetUserAgent() + + assert userAgent['req_interface'] == 'serial' + assert userAgent['req_port'] is not None + + # _transact tests. + def test_transact_calls_transmit_with_req_bytes( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + card._transact(req_bytes, rsp_expected=False) + + card.transmit.assert_called_once_with(req_bytes) + + def test_transact_returns_none_if_rsp_not_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + rsp = card._transact(req_bytes, rsp_expected=False) + + assert rsp is None + + def test_transact_returns_not_none_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._available = MagicMock(return_value=True) + + rsp = card._transact(req_bytes, rsp_expected=True) + + assert rsp is not None + + def test_transact_calls_receive_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._available = MagicMock(return_value=True) + + card._transact(req_bytes, rsp_expected=True) + + card.receive.assert_called_once() + + def test_transact_raises_exception_on_timeout(self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._available = MagicMock(return_value=False) + + # Force a timeout. + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, + match=('Timed out while querying Notecard for ' + 'available data.')): + card._transact(req_bytes, rsp_expected=True) + + # transmit tests. + def test_transmit_writes_all_data_bytes( + self, arrange_test, tramsit_test_data): + card = arrange_test() + card.uart.write = MagicMock() + + card.transmit(tramsit_test_data, True) + + # Using the argument history of the uart.write mock, assemble a + # bytearray of the data passed to uart.write. + written = bytearray() + for write_call in card.uart.write.call_args_list: + segment = write_call[0][0] + written += segment + # Verify that all the data we passed to transmit was in fact passed to + # uart.write. + assert tramsit_test_data == written + + def test_transmit_does_not_exceed_max_segment_length( + self, arrange_test, tramsit_test_data): + card = arrange_test() + card.uart.write = MagicMock() + + card.transmit(tramsit_test_data) + + for write_call in card.uart.write.call_args_list: + segment = write_call.args[0] + assert len(segment) <= notecard.CARD_REQUEST_SEGMENT_MAX_LEN + + # receive tests. + def test_receive_raises_exception_on_timeout(self, arrange_test): + card = arrange_test() + card._available = MagicMock(return_value=False) + + # Force a timeout. + with patch('notecard.notecard.has_timed_out', + side_effect=[False, True]): + with pytest.raises(Exception, match=('Timed out waiting to receive ' + 'data from Notecard.')): + card.receive() + + def test_receive_returns_all_bytes_from_read_byte( + self, arrange_test): + card = arrange_test() + read_byte_mock = MagicMock() + read_byte_mock.side_effect = [b'{', b'}', b'\r', b'\n'] + card._read_byte = read_byte_mock + card._available = MagicMock(return_value=True) + expected_data = bytearray('{}\r\n'.encode('utf-8')) + + data = card.receive() + + # Verify that all the bytes returned by _read_byte were returned as a + # bytearray by receive. + assert data == expected_data + + # _read_byte tests. + def test_read_byte_calls_uart_read(self, arrange_test): + card = arrange_test() + card.uart.read = MagicMock() + + card._read_byte() + + card.uart.read.assert_called_once_with(1) + + # NoOpSerialLock tests. + def test_no_op_serial_lock_implements_acquire_and_release(self): + no_op_lock = NoOpSerialLock() + + assert hasattr(no_op_lock, 'acquire') + assert hasattr(no_op_lock, 'release') + + def test_no_op_serial_lock_acquire_returns_no_op_context_manager(self): + no_op_lock = NoOpSerialLock() + + assert isinstance(no_op_lock.acquire(), NoOpContextManager) + + def test_no_op_serial_lock_acquire_accepts_timeout_arg(self): + no_op_lock = NoOpSerialLock() + + no_op_lock.acquire(timeout=10) + + # NoOpContextManager tests. + def test_no_op_context_manager_is_a_context_manager(self): + manager = NoOpContextManager() + + with manager: + pass + + assert isinstance(manager, AbstractContextManager) + + # lock/unlock tests. + def test_lock_calls_acquire_on_underlying_lock(self, arrange_test): + card = arrange_test() + lock_handle_mock = MagicMock() + card.lock_handle = lock_handle_mock + + card.lock() + + lock_handle_mock.acquire.assert_called_once() + + def test_unlock_calls_release_on_underlying_lock(self, arrange_test): + card = arrange_test() + lock_handle_mock = MagicMock() + card.lock_handle = lock_handle_mock + + card.unlock() + + lock_handle_mock.release.assert_called_once() diff --git a/test/unit_test_utils.py b/test/unit_test_utils.py new file mode 100644 index 0000000..9bc8ad1 --- /dev/null +++ b/test/unit_test_utils.py @@ -0,0 +1,40 @@ +class TrueOnNthIteration: + """Iterable that returns False until Nth iteration, then it returns True.""" + + def __init__(self, n): + """Set the iteration to return True on.""" + self.n = n + + def __iter__(self): + self.current = 1 + return self + + def __next__(self): + if self.current > self.n: + raise StopIteration + elif self.current == self.n: + result = True + else: + result = False + + self.current += 1 + + return result + + +class BooleanToggle: + """Iterable that returns a toggling boolean.""" + + def __init__(self, initial_value): + """Set the initial state (i.e. False or True).""" + self.initial_value = initial_value + + def __iter__(self): + self.current = self.initial_value + return self + + def __next__(self): + result = self.current + self.current = not self.current + + return result