From 0a3c0b8ce270912211ccf82d2126742bfeaeeeb8 Mon Sep 17 00:00:00 2001 From: Hayden Roche Date: Sun, 17 Sep 2023 21:26:56 -0700 Subject: [PATCH 1/4] Make changes to support binary transfers. - Add CRC support. - Align serial and I2C code with note-c. This encompasses mimicking the algorithms used in note-c and also things like delay lengths, number of retries, transaction timeout lengths, etc. - Align transaction logic with note-c. The majority of this work is in the reworked `Transaction` function, which is analogous to `noteTransactionShouldLock` in note-c. - Rework serial/I2C locking so that the lock can be held for the entirety of a binary transfer. For example, for a binary receive operation, the host needs to send a `card.binary.get` and then immediately receive the binary data from the Notecard. The serial/I2C lock should be held for the entirety of this operation, rather than be released after the `card.binary.get` and reacquired for the receipt of the binary data. - Add two methods for both serial and I2C: `transmit` and `receive`. `transmit` is used to transmit arbitrary bytes to the Notecard (e.g. after `card.binary.put`). `receive` is used to read a stream of bytes from the Notecard, stopping after reading a newline (e.g. after a `card.binary.get`). These are analogous to the `chunked(Receive|Transmit)` functions in note-c. - Overhaul unit testing. This commit adds many new unit tests, in addition to reorganizing things. Tests for the `Notecard` base class are in test_notecard.py. `OpenSerial` tests are in test_serial.py, and `OpenI2C` tests are in test_i2c.py. There is a some code repetition here, especially between the `_transact` and `Reset` methods of `OpenSerial` and `OpenI2C`. Again, this follows the pattern in note-c and was done consciously. We may want to factor out that repetition in the future, but for now, I'm prioritizing parity with the "source of truth" (note-c). --- notecard/crc32.py | 33 ++ notecard/notecard.py | 866 ++++++++++++++++++++++++++++------------ test/__init__.py | 0 test/test_i2c.py | 452 +++++++++++++++++++++ test/test_notecard.py | 580 +++++++++++++++++++-------- test/test_serial.py | 341 ++++++++++++++++ test/unit_test_utils.py | 40 ++ 7 files changed, 1875 insertions(+), 437 deletions(-) create mode 100644 notecard/crc32.py create mode 100644 test/__init__.py create mode 100644 test/test_i2c.py create mode 100644 test/test_serial.py create mode 100644 test/unit_test_utils.py diff --git a/notecard/crc32.py b/notecard/crc32.py new file mode 100644 index 0000000..4a2d208 --- /dev/null +++ b/notecard/crc32.py @@ -0,0 +1,33 @@ +"""Module for computing the CRC32 of arbitrary data.""" + +crc32_lookup_table = [ + 0x00000000, 0x1DB71064, 0x3B6E20C8, 0x26D930AC, 0x76DC4190, 0x6B6B51F4, + 0x4DB26158, 0x5005713C, 0xEDB88320, 0xF00F9344, 0xD6D6A3E8, 0xCB61B38C, + 0x9B64C2B0, 0x86D3D2D4, 0xA00AE278, 0xBDBDF21C +] + + +def _logical_rshift(val, shift_amount, num_bits=32): + """Logcally right shift `val` by `shift_amount` bits. + + Logical right shift (i.e. right shift that fills with 0s instead of the + sign bit) isn't supported natively in Python. This is a simple + implementation. See: + https://realpython.com/python-bitwise-operators/#arithmetic-vs-logical-shift + """ + unsigned_val = val % (1 << num_bits) + return unsigned_val >> shift_amount + + +def crc32(data): + """Compute CRC32 of the given data. + + Small lookup-table half-byte CRC32 algorithm based on: + https://create.stephan-brumme.com/crc32/#half-byte + """ + crc = ~0 + for idx in range(len(data)): + crc = crc32_lookup_table[(crc ^ data[idx]) & 0x0F] ^ _logical_rshift(crc, 4) + crc = crc32_lookup_table[(crc ^ _logical_rshift(data[idx], 4)) & 0x0F] ^ _logical_rshift(crc, 4) + + return ~crc & 0xffffffff diff --git a/notecard/notecard.py b/notecard/notecard.py index d36f2b6..0aa9e84 100644 --- a/notecard/notecard.py +++ b/notecard/notecard.py @@ -36,12 +36,12 @@ import time from .timeout import start_timeout, has_timed_out from .transaction_manager import TransactionManager, NoOpTransactionManager +from .crc32 import crc32 use_periphery = False use_serial_lock = False if sys.implementation.name == 'cpython' and (sys.platform == 'linux' or sys.platform == 'linux2'): - use_periphery = True from periphery import I2C @@ -54,19 +54,32 @@ class SerialLockTimeout(Exception): pass +use_i2c_lock = not use_periphery and sys.implementation.name != 'micropython' + NOTECARD_I2C_ADDRESS = 0x17 +NOTECARD_I2C_MAX_TRANSFER_DEFAULT = 255 -# The notecard is a real-time device that has a fixed size interrupt buffer. -# We can push data at it far, far faster than it can process it, -# therefore we push it in segments with a pause between each segment. +# The notecard is a real-time device that has a fixed size interrupt buffer. We +# can push data at it far, far faster than it can process it. Therefore, we push +# it in segments with a pause between each segment. CARD_REQUEST_SEGMENT_MAX_LEN = 250 -# "a 250ms delay is required to separate "segments", ~256 byte -# I2C transactions." See +# "a 250ms delay is required to separate "segments", ~256 byte I2C +# transactions." See # https://dev.blues.io/guides-and-tutorials/notecard-guides/serial-over-i2c-protocol/#data-write CARD_REQUEST_SEGMENT_DELAY_MS = 250 # "A 20ms delay is commonly used to separate smaller I2C transactions known as # 'chunks'". See the same document linked above. -I2C_CHUNK_DELAY_MS = 20 +CARD_REQUEST_I2C_CHUNK_DELAY_MS = 20 +# The delay, in miliseconds, to wait after receiving a NACK I2C. +CARD_REQUEST_I2C_NACK_WAIT_MS = 1000 +# The number of times to retry syncing up with the Notecard during a reset +# before giving up. +CARD_RESET_SYNC_RETRIES = 10 +# The time, in miliseconds, to drain incoming messages during a reset. +CARD_RESET_DRAIN_MS = 500 +CARD_INTER_TRANSACTION_TIMEOUT_SEC = 30 +CARD_INTRA_TRANSACTION_TIMEOUT_SEC = 1 +CARD_TRANSACTION_RETRIES = 5 class NoOpContextManager: @@ -88,51 +101,16 @@ def acquire(*args, **kwargs): """Acquire the no-op lock.""" return NoOpContextManager() - -def serial_lock(fn): - """Attempt to get a lock on the serial channel used for Notecard comms.""" - - def decorator(self, *args, **kwargs): - try: - with self.lock.acquire(timeout=5): - return fn(self, *args, **kwargs) - except SerialLockTimeout: - raise Exception('Notecard in use') - - return decorator - - -def i2c_lock(fn): - """Attempt to get a lock on the I2C bus used for Notecard comms.""" - - def decorator(self, *args, **kwargs): - retries = 5 - while retries != 0: - if self.lock(): - break - - retries -= 1 - # Try again after 100 ms. - time.sleep(.1) - - if retries == 0: - raise Exception('Failed to acquire I2C lock.') - - try: - ret = fn(self, *args, **kwargs) - finally: - self.unlock() - - return ret - - return decorator + def release(*args, **kwargs): + """Release the no-op lock.""" + pass class Notecard: """Base Notecard class.""" def __init__(self, debug=False): - """Configure user agent.""" + """Initialize the Notecard object.""" self._user_agent_app = None self._user_agent_sent = False self._user_agent = { @@ -147,6 +125,66 @@ def __init__(self, debug=False): self._user_agent['os_family'] = os.uname().machine self._transaction_manager = NoOpTransactionManager() self._debug = debug + self._last_request_seq_number = 0 + self._card_supports_crc = False + self._reset_required = True + + def _crc_add(self, req, seq_number): + """Add a CRC field to the request. + + The CRC field also contains a sequence number and has this format: + + "crc":"SSSS:CCCCCCCC" + + SSSS is the sequence number encoded as a string of 4 hex digits. + CCCCCCCC is the CRC32 encoded as a string of 8 hex digits. + """ + req_bytes = json.dumps(req, separators=(',', ':')).encode('utf-8') + crc_hex = '{:08x}'.format(crc32(req_bytes)) + seq_number_hex = '{:04x}'.format(seq_number) + req['crc'] = f'{seq_number_hex}:{crc_hex}' + + def _crc_error(self, rsp_bytes): + """Check the CRC in a Notecard response.""" + rsp_json = json.loads(rsp_bytes) + if 'crc' not in rsp_json: + # If there's not a 'crc' field in the response, it's only an error + # if the Notecard supports CRC. + return self._card_supports_crc + + self._card_supports_crc = True + + # Extract the sequence number and CRC. We do all this via string + # operations instead of decoding the JSON. In Python, numbers with long + # decimal parts (e.g. 10.11111111111111123445522123) get truncated in + # the decoding process. When re-encoded, the string representation is + # also truncated, and so the CRC will be computed over a different + # string than was originally sent, resulting in a CRC error. + seq_number, crc = rsp_json['crc'].split(':') + # Remove the 'crc' field from the response. + rsp_str = rsp_bytes.decode() + rsp_str_crc_removed = rsp_str.split('"crc":')[0] + if rsp_str_crc_removed[-1] == ',': + rsp_str_crc_removed = rsp_str_crc_removed[:-1] + '}' + else: + rsp_str_crc_removed = rsp_str_crc_removed.rstrip() + '}' + + # Compute the CRC over the response, with the 'crc' field removed. + bytes_for_crc = rsp_str_crc_removed.encode('utf-8') + computed_crc = '{:08x}'.format(crc32(bytes_for_crc)).upper() + expected_seq_number = '{:04x}'.format(self._last_request_seq_number) + + if seq_number != expected_seq_number: + if self._debug: + print(('Sequence number mismatch. Expected ' + f'{expected_seq_number}, received {seq_number}.')) + return True + elif crc != computed_crc: + if self._debug: + print(f'CRC error. Computed {computed_crc}, received {crc}.') + return True + + return False def _prepare_request(self, req): """Prepare a request for transmission to the Notecard.""" @@ -158,34 +196,184 @@ def _prepare_request(self, req): self._user_agent_sent = True - # Serialize the JSON request to a string. - req_string = json.dumps(req) + rsp_expected = 'req' in req + + # If this is a request and not a command, add a CRC. + if rsp_expected: + self._crc_add(req, self._last_request_seq_number) + + # Serialize the JSON request to a string, removing any unnecessary + # whitespace. + req_string = json.dumps(req, separators=(',', ':')) if self._debug: print(req_string) req_string += "\n" # Encode the request string as UTF-8 bytes. - return req_string.encode('utf-8') + return (req_string.encode('utf-8'), rsp_expected) + + def _transaction_timeout_seconds(self, req): + """Determine the timeout to use, in seconds, for the transaction. + + When note.add or web.* requests are used to transfer binary data, the + time to complete the transaction varies depending on the size of the + payload and network conditions. Therefore, it's possible for these + transactions to timeout prematurely. + + This method does the following: + - If the request is a `note.add`, set the timeout value to the + value of the "milliseconds" parameter, if it exists. If it + doesn't, use the "seconds" parameter. If that doesn't exist, + use the standard timeout of `CARD_INTER_TRANSACTION_TIMEOUT_SEC`. + - If the request is a `web.*`, follow the same logic, but instead + of using the standard timeout, use 90 seconds for all `web.*` + transactions. + """ + timeout_secs = CARD_INTER_TRANSACTION_TIMEOUT_SEC + if 'req' in req: + req_key = 'req' + elif 'cmd' in req: + req_key = 'cmd' + else: + raise Exception(('Malformed request. Missing \'req\' or \'cmd\' ' + f'field: {req}.')) + + if req[req_key] == 'note.add': + if 'milliseconds' in req: + timeout_secs = req['milliseconds'] / 1000 + elif 'seconds' in req: + timeout_secs = req['seconds'] + elif 'web.' in req[req_key]: + if 'milliseconds' in req: + timeout_secs = req['milliseconds'] / 1000 + elif 'seconds' in req: + timeout_secs = req['seconds'] + else: + timeout_secs = 90 - def Command(self, req): - """Send a command to the Notecard. The Notecard response is ignored.""" - if 'cmd' not in req: - raise Exception("Please use 'cmd' instead of 'req'") + if self._debug: + print(f'Using transaction timeout of {timeout_secs} seconds.') - req_bytes = self._prepare_request(req) - self._transact(req_bytes, False) + return timeout_secs - def Transaction(self, req): - """Perform a Notecard transaction and return the result.""" - req_bytes = self._prepare_request(req) - rsp_bytes = self._transact(req_bytes, True) - rsp_json = json.loads(rsp_bytes) - if self._debug: + def Transaction(self, req, lock=True): + """Send a request to the Notecard and read back a response. + + If the request is a command (indicated by using 'cmd' in the request + instead of 'req'), don't return a response. + + The underlying transport channel (serial or I2C) is locked for the + duration of the request and response if `lock` is True. + """ + rsp_json = None + timeout_secs = self._transaction_timeout_seconds(req) + req_bytes, rsp_expected = self._prepare_request(req) + + if self._reset_required: + self.Reset() + + try: + self._transaction_manager.start(CARD_INTER_TRANSACTION_TIMEOUT_SEC) + if lock: + self.lock() + + retries_left = CARD_TRANSACTION_RETRIES + error = False + if rsp_expected: + while retries_left > 0: + try: + rsp_bytes = self._transact( + req_bytes, rsp_expected=True, + timeout_secs=timeout_secs) + except Exception as e: + if self._debug: + print(e) + + error = True + self.Reset() + retries_left -= 1 + time.sleep(0.5) + continue + + if self._crc_error(rsp_bytes): + if self._debug: + print('CRC error on response from Notecard.') + + error = True + retries_left -= 1 + time.sleep(0.5) + continue + + try: + rsp_json = json.loads(rsp_bytes) + except Exception as e: + if self._debug: + print(e) + + error = True + retries_left -= 1 + time.sleep(0.5) + continue + + if 'err' in rsp_json: + if '{io}' in rsp_json['err']: + if self._debug: + print(('Response has error field indicating I/O' + ' error.')) + + error = True + retries_left -= 1 + time.sleep(0.5) + continue + elif '{bad-bin}' in rsp_json['err']: + if self._debug: + print(('Response has error field indicating ' + 'binary I/O error. Not eligible for ' + 'retry.')) + + error = True + break + + error = False + break + else: + try: + self._transact(req_bytes, rsp_expected=False, + timeout_secs=timeout_secs) + except Exception as e: + error = True + if self._debug: + print(e) + + self._last_request_seq_number += 1 + + if error: + self._reset_required = True + raise Exception('Failed to transact with Notecard.') + + finally: + if lock: + self.unlock() + + self._transaction_manager.stop() + + if self._debug and rsp_json is not None: print(rsp_json) return rsp_json + def Command(self, req): + """Send a command to the Notecard. + + Unlike `Transaction`, `Command` doesn't return a response from the + Notecard. + """ + if 'cmd' not in req: + raise Exception("Please use 'cmd' instead of 'req'") + + self.Transaction(req) + def GetUserAgent(self): """Return the User Agent String for the host for debug purposes.""" ua_copy = self._user_agent.copy() @@ -208,96 +396,155 @@ def SetTransactionPins(self, rtx_pin, ctx_pin): class OpenSerial(Notecard): """Notecard class for Serial communication.""" - @serial_lock - def _transact(self, req, rsp_expected): - """Perform a low-level transaction with the Notecard.""" - rsp = None + def _transact(self, req_bytes, rsp_expected, + timeout_secs=CARD_INTER_TRANSACTION_TIMEOUT_SEC): + self.transmit(req_bytes) - try: - transaction_timeout_secs = 30 - self._transaction_manager.start(transaction_timeout_secs) - - seg_off = 0 - seg_left = len(req) - while seg_left > 0: - seg_len = seg_left - if seg_len > CARD_REQUEST_SEGMENT_MAX_LEN: - seg_len = CARD_REQUEST_SEGMENT_MAX_LEN - - self.uart.write(req[seg_off:seg_off + seg_len]) - seg_off += seg_len - seg_left -= seg_len - time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + if not rsp_expected: + return - if rsp_expected: - rsp = self.uart.readline() - finally: - self._transaction_manager.stop() + start = start_timeout() + while not self._available(): + if timeout_secs != 0 and has_timed_out(start, timeout_secs): + raise Exception(('Timed out while querying Notecard for ' + 'available data.')) - return rsp + # Delay for 10 ms before checking for available data again. + time.sleep(.01) - def _read_byte_micropython(self): - """Read a single byte from the Notecard (MicroPython).""" - if not self.uart.any(): - return None - return self.uart.read(1) + return self.receive() - def _read_byte_cpython(self): - """Read a single byte from the Notecard (CPython).""" - if self.uart.in_waiting == 0: - return None - return self.uart.read(1) + def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC, + delay=True): + """Read a newline-terminated batch of data from the Notecard.""" + data = bytearray() + received_newline = False + start = start_timeout() + + while not received_newline: + while not self._available(): + if timeout_secs != 0 and has_timed_out(start, timeout_secs): + raise Exception(('Timed out waiting to receive data from ' + 'Notecard.')) + + # Sleep while awaiting the first byte (lazy). After the first + # byte, start to spin for the remaining bytes (greedy). + if delay and len(data) == 0: + time.sleep(.001) + + timeout_secs = CARD_INTRA_TRANSACTION_TIMEOUT_SEC + byte = self._read_byte() + data.extend(byte) + received_newline = byte == b'\n' + + return data + + def transmit(self, data, delay=True): + """Send `data` to the Notecard.""" + seg_off = 0 + seg_left = len(data) - def _read_byte_circuitpython(self): - """Read a single byte from the Notecard (CircuitPython).""" + while seg_left > 0: + seg_len = seg_left + if seg_len > CARD_REQUEST_SEGMENT_MAX_LEN: + seg_len = CARD_REQUEST_SEGMENT_MAX_LEN + + self.uart.write(data[seg_off:seg_off + seg_len]) + seg_off += seg_len + seg_left -= seg_len + + if delay: + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + + def _available_micropython(self): + return self.uart.any() + + def _available_default(self): + return self.uart.in_waiting > 0 + + def _read_byte(self): + """Read a single byte from the Notecard.""" return self.uart.read(1) - @serial_lock def Reset(self): """Reset the Notecard.""" + if self._debug: + print('Resetting Notecard serial communications.') + + # Delay to give the Notecard a chance to process any segment sent prior + # to the coming reset sequence. + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + notecard_ready = False - for i in range(10): - try: - # Send a newline to the Notecard to terminate any partial - # request that might be sitting in its input buffer. - self.uart.write(b'\n') - except: - # Wait 500 ms and before trying to send the newline again. - time.sleep(.5) - continue + try: + self.lock() + + for i in range(CARD_RESET_SYNC_RETRIES): + try: + # Send a newline to the Notecard to terminate any partial + # request that might be sitting in its input buffer. + self.uart.write(b'\n') + except Exception as e: + if self._debug: + print(e) + # Wait CARD_RESET_DRAIN_MS and before trying to send the + # newline again. + time.sleep(CARD_RESET_DRAIN_MS / 1000) + continue + + something_found = False + non_control_char_found = False + # Drain serial for 500 ms. + start = start_timeout() + while not has_timed_out(start, CARD_RESET_DRAIN_MS / 1000): + while self._available(): + something_found = True + data = self._read_byte() + if data[0] != ord('\n') and data[0] != ord('\r'): + non_control_char_found = True + # Reset the timer with each non-control character. + start = start_timeout() + + # If there was no data read from the Notecard, wait 1 ms and + # try again. Keep doing this for CARD_RESET_DRAIN_MS. + time.sleep(.001) + + if not something_found: + if self._debug: + print(('Notecard not responding to newline during ' + 'reset.')) + + elif non_control_char_found: + if self._debug: + print(('Received non-control characters from the ' + 'Notecard during reset.')) + else: + # If all we got back is newlines, we're in sync with the + # Notecard. + notecard_ready = True + break + + if self._debug: + print('Retrying reset...') + + # Wait CARD_RESET_DRAIN_MS before trying again. + time.sleep(CARD_RESET_DRAIN_MS / 1000) + + if not notecard_ready: + raise Exception('Failed to reset Notecard.') - something_found = False - non_control_char_found = False - # Drain serial for 500 ms. - start = start_timeout() - while not has_timed_out(start, 0.5): - data = self._read_byte() - # If data was read from the Notecard, inspect what we received. - # If it isn't a \n or \r, the host and the Notecard aren't - # synced up yet, and we'll need to retransmit the \n and try - # again. - while data is not None and data != b'': - something_found = True - if data[0] != ord('\n') and data[0] != ord('\r'): - non_control_char_found = True - - data = self._read_byte() - - # If there was no data read from the Notecard, wait 1 ms and try - # again. Keep doing this for 500 ms. - time.sleep(.001) - - # If we received anything other than newlines from the Notecard, we - # aren't in sync, yet. - if something_found and not non_control_char_found: - notecard_ready = True - break + finally: + self.unlock() - # Wait 500 ms before trying again. - time.sleep(.5) + self._reset_required = False - if not notecard_ready: - raise Exception('Failed to reset Notecard.') + def lock(self): + """Lock access to the serial bus.""" + self.lock_handle.acquire(timeout=5) + + def unlock(self): + """Unlock access to the serial bus.""" + self.lock_handle.release() def __init__(self, uart_id, debug=False): """Initialize the Notecard before a reset.""" @@ -308,18 +555,14 @@ def __init__(self, uart_id, debug=False): self.uart = uart_id if use_serial_lock: - self.lock = FileLock('serial.lock') + self.lock_handle = FileLock('serial.lock') else: - self.lock = NoOpSerialLock() + self.lock_handle = NoOpSerialLock() if sys.implementation.name == 'micropython': - self._read_byte = self._read_byte_micropython - elif sys.implementation.name == 'cpython': - self._read_byte = self._read_byte_cpython - elif sys.implementation.name == 'circuitpython': - self._read_byte = self._read_byte_circuitpython + self._available = self._available_micropython else: - raise NotImplementedError(f'Unsupported platform: {sys.implementation.name}') + self._available = self._available_default self.Reset() @@ -327,36 +570,8 @@ def __init__(self, uart_id, debug=False): class OpenI2C(Notecard): """Notecard class for I2C communication.""" - def _write(self, data): - write_length = bytearray(1) - write_length[0] = len(data) - - # Send a message with the length of the incoming bytes followed - # by the bytes themselves. - self._platform_write(write_length, data) - - def _transmit(self, data): - chunk_offset = 0 - data_left = len(data) - sent_in_seg = 0 - - while data_left > 0: - chunk_len = min(data_left, self.max) - write_data = data[chunk_offset:chunk_offset + chunk_len] - - self._write(write_data) - - chunk_offset += chunk_len - data_left -= chunk_len - sent_in_seg += chunk_len - - if sent_in_seg > CARD_REQUEST_SEGMENT_MAX_LEN: - sent_in_seg -= CARD_REQUEST_SEGMENT_MAX_LEN - time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) - - time.sleep(I2C_CHUNK_DELAY_MS / 1000) - def _read(self, length): + """Perform a serial-over-I2C read.""" initiate_read = bytearray(2) # 0 indicates we are reading from the Notecard. initiate_read[0] = 0 @@ -366,116 +581,250 @@ def _read(self, length): # length accounts for the payload and the +2 is for the header. The # header sent by the Notecard has one byte to indicate the number of # bytes still available to read and a second byte to indicate the number - # of bytes coming in the current chunk. + # of bytes coming in the current packet. read_buf = bytearray(length + 2) - return self._platform_read(initiate_read, read_buf) + self._platform_read(initiate_read, read_buf) + # First two bytes are the header. + header = read_buf[0:2] + # The number of bytes still available to read after this packet. + available = header[0] + # The number of data bytes in this packet. + data_len = header[1] + # The rest is the data. + data = read_buf[2:] + + return (available, data_len, data) - def _receive(self, timeout_secs, chunk_delay_secs, wait_for_newline): - chunk_len = 0 + def _write(self, data): + """Perform a serial-over-I2C write.""" + self._platform_write(bytearray([len(data)]), data) + + def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC, + delay=True): + """Read a newline-terminated batch of data from the Notecard.""" + read_len = 0 received_newline = False + timeout_secs = CARD_INTER_TRANSACTION_TIMEOUT_SEC start = start_timeout() - read_data = bytearray() + received_data = bytearray() while True: - read_buf = self._read(chunk_len) - - # The number of bytes still available to read. - num_bytes_available = read_buf[0] - # The number of bytes in this chunk. - num_bytes_this_chunk = read_buf[1] - if num_bytes_this_chunk > 0: - read_data += read_buf[2:2 + num_bytes_this_chunk] - received_newline = read_buf[-1] == ord('\n') - - chunk_len = min(num_bytes_available, self.max) - # Keep going if there's still byte available to read, even if + available, data_len, data = self._read(read_len) + if data_len > 0: + received_data += data + + timeout_secs = CARD_INTRA_TRANSACTION_TIMEOUT_SEC + start = start_timeout() + + if not received_newline: + received_newline = data[-1] == ord('\n') + + read_len = min(available, self.max) + # Keep going if there are still bytes available to read, even if # we've received a newline. - if chunk_len > 0: + if available > 0: continue - # Otherwise, if there's no bytes available to read and we either + # Otherwise, if there are no bytes available to read and we either # 1) don't care about waiting for a newline or 2) do care and # received the newline, we're done. - if not wait_for_newline or received_newline: + if received_newline: break - # Delay between reading chunks. Note that as long as bytes are - # available to read (i.e. chunk_len > 0), we don't delay here, nor - # do we check the timeout below. This is intentional and mimics the - # behavior of other SDKs (e.g. note-c). - time.sleep(chunk_delay_secs) - if timeout_secs != 0 and has_timed_out(start, timeout_secs): - raise Exception("Timed out while reading data from the Notecard.") + raise Exception(('Timed out while reading data from the ' + 'Notecard.')) - return read_data + if delay: + time.sleep(0.05) - @i2c_lock - def _transact(self, req, rsp_expected): - """Perform a low-level transaction with the Notecard.""" - rsp = None + return received_data - try: - transaction_timeout_secs = 30 - self._transaction_manager.start(transaction_timeout_secs) + def transmit(self, data, delay=True): + """Send `data` to the Notecard.""" + chunk_offset = 0 + data_left = len(data) + sent_in_seg = 0 - self._transmit(req) + while data_left > 0: + # Delay for 5ms. This prevents a fast host from hammering a + # slow/busy Notecard with requests. + time.sleep(.005) - if rsp_expected: - rsp = self._receive(30, 0.05, True) - finally: - self._transaction_manager.stop() + chunk_len = min(data_left, self.max) + write_data = data[chunk_offset:chunk_offset + chunk_len] + self._write(write_data) + + chunk_offset += chunk_len + data_left -= chunk_len + sent_in_seg += chunk_len + + if sent_in_seg > CARD_REQUEST_SEGMENT_MAX_LEN: + sent_in_seg -= CARD_REQUEST_SEGMENT_MAX_LEN + + if delay: + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + + if delay: + time.sleep(CARD_REQUEST_I2C_CHUNK_DELAY_MS / 1000) + + def _transact(self, req_bytes, rsp_expected, + timeout_secs=CARD_INTER_TRANSACTION_TIMEOUT_SEC): + self.transmit(req_bytes) + + if not rsp_expected: + return + + # Delay for 5ms. This prevents a fast host from hammering a slow/busy + # Notecard with requests. + time.sleep(0.005) + + start = start_timeout() + available = 0 + while available == 0: + available, _, _ = self._read(0) + + if timeout_secs != 0 and has_timed_out(start, timeout_secs): + raise Exception(('Timed out while querying Notecard for ' + 'available data.')) - return rsp + return self.receive() - @i2c_lock def Reset(self): """Reset the Notecard.""" - # Send a newline to the Notecard to terminate any partial request that - # might be sitting in its input buffer. - self._transmit(b'\n') + if self._debug: + print('Resetting Notecard I2C communications.') - time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + notecard_ready = False + try: + self.lock() + + for i in range(CARD_RESET_SYNC_RETRIES): + try: + # Send a newline to the Notecard to terminate any partial + # request that might be sitting in its input buffer. + self._write(b'\n') + except Exception as e: + if self._debug: + print(e) + time.sleep(CARD_REQUEST_I2C_NACK_WAIT_MS / 1000) + continue - # Read from the Notecard until there's nothing left, retrying a max of 3 - # times. - retries = 3 - while retries > 0: - try: - self._receive(0, .001, False) - except: - retries -= 1 - else: - break + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) - if retries == 0: + something_found = False + non_control_char_found = False + + start = start_timeout() + read_len = 0 + while not has_timed_out(start, CARD_RESET_DRAIN_MS / 1000): + try: + available, data_len, data = self._read(read_len) + except Exception as e: + if self._debug: + print(e) + time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) + continue + + if data_len > 0: + something_found = True + # The Notecard responds to a bare `\n` with `\r\n`. If + # we get any other characters back, it means the host + # and Notecard aren't synced up yet, and we need to + # transmit `\n` again. + for byte in data: + if byte != ord('\n') and byte != ord('\r'): + non_control_char_found = True + # Reset the timer with each non-control + # character. + start = start_timeout() + + read_len = min(available, self.max) + + time.sleep(CARD_REQUEST_I2C_CHUNK_DELAY_MS / 1000) + + if not something_found: + if self._debug: + print(('Notecard not responding to newline during ' + 'reset.')) + time.sleep(.005) + elif non_control_char_found: + if self._debug: + print(('Received non-control characters from the ' + 'Notecard during reset.')) + else: + # If all we got back is newlines, we're in sync with the + # Notecard. + notecard_ready = True + break + + if self._debug: + print('Retrying reset...') + + # Wait CARD_RESET_DRAIN_MS before trying again. + time.sleep(CARD_RESET_DRAIN_MS / 1000) + finally: + self.unlock() + + if not notecard_ready: raise Exception('Failed to reset Notecard.') - def _linux_write(self, length, data): + self._reset_required = False + + def _cpython_write(self, length, data): # noqa: D403 + """CPython implementation of serial-over-I2C write.""" msgs = [I2C.Message(length + data)] self.i2c.transfer(self.addr, msgs) - def _non_linux_write(self, length, data): + def _non_cpython_write(self, length, data): + """Non-CPython implementation of serial-over-I2C write.""" self.i2c.writeto(self.addr, length + data) - def _linux_read(self, initiate_read_msg, read_buf): - msgs = [I2C.Message(initiate_read_msg), I2C.Message(read_buf, read=True)] + def _cpython_read(self, initiate_read_msg, read_buf): # noqa: D403 + """CPython implementation of serial-over-I2C read.""" + msgs = [ + I2C.Message(initiate_read_msg), + I2C.Message(read_buf, read=True) + ] self.i2c.transfer(self.addr, msgs) - read_buf = msgs[1].data - - return read_buf + read_bytes = msgs[1].data + read_buf[:len(read_bytes)] = read_bytes - def _micropython_read(self, initiate_read_msg, read_buf): + def _micropython_read(self, initiate_read_msg, read_buf): # noqa: D403 + """MicroPython implementation of serial-over-I2C read.""" self.i2c.writeto(self.addr, initiate_read_msg, False) self.i2c.readfrom_into(self.addr, read_buf) - return read_buf - - def _circuitpython_read(self, initiate_read_msg, read_buf): + def _circuitpython_read(self, initiate_read_msg, read_buf): # noqa: D403 + """CircuitPython implementation of serial-over-I2C read.""" self.i2c.writeto_then_readfrom(self.addr, initiate_read_msg, read_buf) - return read_buf + def lock(self): + """Lock access to the I2C bus.""" + retries = 5 + while retries != 0: + if self.lock_fn(): + break + + retries -= 1 + # Try again after 100 ms. + time.sleep(.1) + + if retries == 0: + raise Exception('Failed to acquire I2C lock.') + + def unlock(self): + """Unlock access to the I2C bus.""" + self.unlock_fn() + + def _i2c_no_op_try_lock(*args, **kwargs): + """No-op lock function.""" + return True + + def _i2c_no_op_unlock(*args, **kwargs): + """No-op unlock function.""" + pass def __init__(self, i2c, address, max_transfer, debug=False): """Initialize the Notecard before a reset.""" @@ -485,41 +834,30 @@ def __init__(self, i2c, address, max_transfer, debug=False): self.i2c = i2c - def i2c_no_op_try_lock(*args, **kwargs): - """No-op lock function.""" - return True - - def i2c_no_op_unlock(*args, **kwargs): - """No-op unlock function.""" - pass - - use_i2c_lock = not use_periphery and sys.implementation.name != 'micropython' if use_i2c_lock: - self.lock = self.i2c.try_lock - self.unlock = self.i2c.unlock + self.lock_fn = self.i2c.try_lock + self.unlock_fn = self.i2c.unlock else: - self.lock = i2c_no_op_try_lock - self.unlock = i2c_no_op_unlock + self.lock_fn = self._i2c_no_op_try_lock + self.unlock_fn = self._i2c_no_op_unlock if address == 0: self.addr = NOTECARD_I2C_ADDRESS else: self.addr = address if max_transfer == 0: - self.max = 255 + self.max = NOTECARD_I2C_MAX_TRANSFER_DEFAULT else: self.max = max_transfer - if use_periphery: - self._platform_write = self._linux_write - self._platform_read = self._linux_read - elif sys.implementation.name == 'micropython': - self._platform_write = self._non_linux_write + if sys.implementation.name == 'micropython': + self._platform_write = self._non_cpython_write self._platform_read = self._micropython_read elif sys.implementation.name == 'circuitpython': - self._platform_write = self._non_linux_write + self._platform_write = self._non_cpython_write self._platform_read = self._circuitpython_read else: - raise NotImplementedError(f'Unsupported platform: {sys.implementation.name}') + self._platform_write = self._cpython_write + self._platform_read = self._cpython_read self.Reset() diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/test_i2c.py b/test/test_i2c.py new file mode 100644 index 0000000..144fc95 --- /dev/null +++ b/test/test_i2c.py @@ -0,0 +1,452 @@ +import os +import sys +import pytest +from unittest.mock import MagicMock, patch +from .unit_test_utils import TrueOnNthIteration, BooleanToggle + +sys.path.insert(0, + os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import notecard # noqa: E402 + + +@pytest.fixture +def arrange_test(): + def _arrange_test(address=0, max_transfer=0, debug=False, + mock_locking=True): + # OpenI2C's __init__ will call Reset, which we don't care about + # actually doing here, so we mock Reset. + with patch('notecard.notecard.OpenI2C.Reset'): + card = notecard.OpenI2C(MagicMock(), address, max_transfer, + debug) + + if mock_locking: + card.lock = MagicMock() + card.unlock = MagicMock() + + return card + + # Mocking time.sleep makes the tests run faster because no actual sleeping + # occurs. + with patch('notecard.notecard.time.sleep'): + # Yield instead of return so that the time.sleep patch is active for the + # duration of the test. + yield _arrange_test + + +@pytest.fixture +def arrange_reset_test(arrange_test): + def _arrange_reset_test(): + card = arrange_test() + card._write = MagicMock() + card._read = MagicMock() + + return card + + yield _arrange_reset_test + + +@pytest.fixture +def arrange_transact_test(arrange_test): + def _arrange_transact_test(): + card = arrange_test() + card.transmit = MagicMock() + card.receive = MagicMock() + req_bytes = card._prepare_request({'req': 'card.version'}) + + return card, req_bytes + + yield _arrange_transact_test + + +class TestI2C: + # Reset tests. + def test_reset_succeeds_on_good_notecard_response( + self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (0, 2, b'\r\n') + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + assert not card._reset_required + + def test_reset_sends_a_newline_to_clear_stale_response( + self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (0, 2, b'\r\n') + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card._write.assert_called_once_with(b'\n') + + def test_reset_locks_and_unlocks(self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (0, 2, b'\r\n') + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_unlocks_after_exception(self, arrange_reset_test): + card = arrange_reset_test() + card._write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_fails_if_continually_reads_non_control_chars( + self, arrange_reset_test): + card = arrange_reset_test() + card._read.return_value = (1, 1, b'h') + + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + def test_reset_required_if_reset_fails(self, arrange_reset_test): + card = arrange_reset_test() + card._write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + assert card._reset_required + + # __init__ tests. + def test_init_calls_reset(self): + with patch('notecard.notecard.OpenI2C.Reset') as reset_mock: + notecard.OpenI2C(MagicMock(), 0, 0) + + reset_mock.assert_called_once() + + @pytest.mark.parametrize( + 'addr_param,expected_addr', + [ + (0, notecard.NOTECARD_I2C_ADDRESS), + (7, 7) + ] + ) + def test_init_sets_address_correctly( + self, addr_param, expected_addr, arrange_test): + card = arrange_test(address=addr_param) + + assert card.addr == expected_addr + + @pytest.mark.parametrize( + 'max_param,expected_max', + [ + (0, notecard.NOTECARD_I2C_MAX_TRANSFER_DEFAULT), + (7, 7) + ] + ) + def test_init_sets_max_transfer_correctly( + self, max_param, expected_max, arrange_test): + card = arrange_test(max_transfer=max_param) + + assert card.max == expected_max + + @pytest.mark.parametrize('debug_param', [False, True]) + def test_init_sets_debug_correctly(self, debug_param, arrange_test): + card = arrange_test(debug=debug_param) + + assert card._debug == debug_param + + @pytest.mark.parametrize('use_i2c_lock', [False, True]) + def test_init_uses_appropriate_locking_functions( + self, use_i2c_lock, arrange_test): + with patch('notecard.notecard.use_i2c_lock', new=use_i2c_lock): + card = arrange_test() + + if use_i2c_lock: + assert card.lock_fn == card.i2c.try_lock + assert card.unlock_fn == card.i2c.unlock + else: + assert card.lock_fn.__func__ == \ + notecard.OpenI2C._i2c_no_op_try_lock + assert card.unlock_fn.__func__ == \ + notecard.OpenI2C._i2c_no_op_unlock + + @pytest.mark.parametrize( + 'platform,write_method,read_method', + [ + ( + 'micropython', + notecard.OpenI2C._non_cpython_write, + notecard.OpenI2C._micropython_read + ), + ( + 'circuitpython', + notecard.OpenI2C._non_cpython_write, + notecard.OpenI2C._circuitpython_read + ), + ( + 'cpython', + notecard.OpenI2C._cpython_write, + notecard.OpenI2C._cpython_read + ), + ] + ) + def test_init_sets_platform_hooks_correctly( + self, platform, write_method, read_method, arrange_test): + with patch('notecard.notecard.sys.implementation.name', new=platform): + card = arrange_test() + + assert card._platform_write.__func__ == write_method + assert card._platform_read.__func__ == read_method + + def test_user_agent_indicates_i2c_after_init(self, arrange_test): + card = arrange_test() + userAgent = card.GetUserAgent() + + assert userAgent['req_interface'] == 'i2c' + assert userAgent['req_port'] is not None + + # receive tests. + def test_receive_returns_all_data_bytes_from_read(self, arrange_test): + card = arrange_test() + payload = b'{}\r\n' + card._read = MagicMock() + card._read.side_effect = [ + # There are 4 bytes available to read, and there are no more bytes + # to read in this packet. + (4, 0, None), + # 0 bytes available to read after this packet. 4 coming in this + # packet, and they are {}\r\n. + (0, 4, payload) + ] + + rx_data = card.receive() + + assert rx_data == payload + + def test_receive_keeps_reading_if_data_available_after_newline( + self, arrange_test): + card = arrange_test() + payload = b'{}\r\n' + excess_data = b'io' + card._read = MagicMock() + card._read.side_effect = [ + # There are 4 bytes available to read, and there are no more bytes + # to read in this packet. + (4, 0, None), + # 2 bytes available to read after this packet. 4 coming in this + # packet, and they are {}\r\n. + (2, 4, payload), + # 0 bytes after this packet. 2 coming in this packet, and they are + # io. + (0, 2, excess_data) + ] + + rx_data = card.receive() + + assert rx_data == (payload + excess_data) + + def test_receive_raises_exception_on_timeout(self, arrange_test): + card = arrange_test() + payload = b'{}\r' + card._read = MagicMock() + card._read.side_effect = [ + # There are 3 bytes available to read, and there are no more bytes + # to read in this packet. + (4, 0, None), + # 0 bytes available to read after this packet. 3 coming in this + # packet, and they are {}\r. The lack of a newline at the end will + # cause this test to hit the timeout. + (0, 3, payload) + ] + + with patch('notecard.notecard.has_timed_out', return_value=True): + with pytest.raises(Exception, match=('Timed out while reading ' + 'data from the Notecard.')): + card.receive() + + # transmit tests. + def test_transmit_writes_all_data_bytes(self, arrange_test): + card = arrange_test() + # Create a bytearray to transmit. It should be larger than a single I2C + # chunk (i.e. greater than card.max), and it should not fall neatly onto + # a segment boundary. + data_len = card.max * 2 + 15 + data = bytearray(i % 256 for i in range(data_len)) + write_mock = MagicMock() + card._write = write_mock + + card.transmit(data) + + # Using the argument history of the _write mock, assemble a bytearray of + # the data passed to write. + written = bytearray() + for write_call in write_mock.call_args_list: + segment = write_call[0][0] + written += segment + + # Verify that all the data we passed to transmit was in fact passed to + # uart.write. + assert data == written + + def test_transmit_does_not_exceed_max_transfer_size(self, arrange_test): + card = arrange_test() + # Create a bytearray to transmit. It should be larger than a single + # I2C chunk (i.e. greater than card.max), and it should not fall neatly + # onto a segment boundary. + data_len = card.max * 2 + 15 + data = bytearray(i % 256 for i in range(data_len)) + write_mock = MagicMock() + card._write = write_mock + + card.transmit(data) + + for write_call in write_mock.call_args_list: + assert len(write_call[0][0]) <= card.max + + # _transact tests. + def test_transact_calls_transmit_with_req_bytes( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + card._transact(req_bytes, rsp_expected=False) + + card.transmit.assert_called_once_with(req_bytes) + + def test_transact_returns_none_if_rsp_not_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + rsp = card._transact(req_bytes, rsp_expected=False) + + assert rsp is None + + def test_transact_returns_not_none_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._read = MagicMock(return_value=(1, None, None)) + + rsp = card._transact(req_bytes, rsp_expected=True) + + assert rsp is not None + + def test_transact_calls_receive_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._read = MagicMock(return_value=(1, None, None)) + + card._transact(req_bytes, rsp_expected=True) + + card.receive.assert_called_once() + + def test_transact_raises_exception_on_timeout(self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._read = MagicMock(return_value=(0, None, None)) + + # Force a timeout. + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, + match=('Timed out while querying Notecard for ' + 'available data.')): + card._transact(req_bytes, rsp_expected=True) + + # _read tests + def test_read_sends_the_initial_read_packet_correctly(self, arrange_test): + card = arrange_test() + card._platform_read = MagicMock() + length = 12 + # To start a read from the Notecard using serial-over-I2C, the host + # should send a 0 byte followed by a byte with the requested read + # length. + expected_packet = bytearray(2) + expected_packet[0] = 0 + expected_packet[1] = length + + card._read(length) + + card._platform_read.assert_called_once() + assert card._platform_read.call_args[0][0] == expected_packet + + def test_read_sizes_read_buf_correctly(self, arrange_test): + card = arrange_test() + card._platform_read = MagicMock() + header_length = 2 + data_length = 12 + expected_read_buffer_length = header_length + data_length + + card._read(data_length) + + card._platform_read.assert_called_once() + assert card._platform_read.call_args[0][1] == bytearray(expected_read_buffer_length) + + def test_read_parses_data_correctly(self, arrange_test): + available = 8 + data_len = 4 + data = b'\xDE\xAD\xBE\xEF' + + def platform_read_side_effect(initiate_read_msg, read_buf): + read_buf[0] = available + read_buf[1] = data_len + read_buf[2:] = data + + card = arrange_test() + card._platform_read = MagicMock(side_effect=platform_read_side_effect) + + actual_available, actual_data_len, actual_data = card._read(len(data)) + + card._platform_read.assert_called_once() + assert actual_available == available + assert actual_data_len == data_len + assert actual_data == data + + def test_write_calls_platform_write_correctly(self, arrange_test): + card = arrange_test() + card._platform_write = MagicMock() + data = bytearray([0xDE, 0xAD, 0xBE, 0xEF]) + + card._write(data) + + card._platform_write.assert_called_once_with( + bytearray([len(data)]), data) + + # lock tests. + def test_lock_calls_lock_fn(self, arrange_test): + card = arrange_test(mock_locking=False) + card.lock_fn = MagicMock(return_value=True) + + card.lock() + + card.lock_fn.assert_called() + + def test_lock_retries_lock_fn_if_needed(self, arrange_test): + card = arrange_test(mock_locking=False) + # Fails the first time and succeeds the second time. + card.lock_fn = MagicMock(side_effect=[False, True]) + + card.lock() + + assert card.lock_fn.call_count == 2 + + def test_lock_raises_exception_if_lock_fn_never_returns_true( + self, arrange_test): + card = arrange_test(mock_locking=False) + card.lock_fn = MagicMock(return_value=False) + + with pytest.raises(Exception, match='Failed to acquire I2C lock.'): + card.lock() + + # unlock tests. + def test_unlock_calls_unlock_fn(self, arrange_test): + card = arrange_test(mock_locking=False) + card.unlock_fn = MagicMock() + + card.unlock() + + card.unlock_fn.assert_called() diff --git a/test/test_notecard.py b/test/test_notecard.py index 7bcd4b5..395e381 100644 --- a/test/test_notecard.py +++ b/test/test_notecard.py @@ -1,46 +1,411 @@ import os import sys import pytest -from unittest.mock import Mock, MagicMock, patch -import periphery +from unittest.mock import MagicMock, patch import json +import re sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import notecard # noqa: E402 +from notecard.transaction_manager import TransactionManager, NoOpTransactionManager # noqa: E402 -def get_serial_and_port(): - serial = Mock() # noqa: F811 - port = serial.Serial("/dev/tty.foo", 9600) - port.read.side_effect = [b'\r', b'\n', None] - port.readline.return_value = "\r\n" +@pytest.fixture +def arrange_transaction_test(): + # Mocking time.sleep makes the tests run faster because no actual sleeping + # occurs. + with patch('notecard.notecard.time.sleep'): + def _arrange_transaction_test(): + card = notecard.Notecard() + card.Reset = MagicMock() + card.lock = MagicMock() + card.unlock = MagicMock() + card._transact = MagicMock(return_value=b'{}\r\n') + card._crc_error = MagicMock(return_value=False) + + return card + + # Yield instead of return so that the time.sleep patch is active for the + # duration of the test. + yield _arrange_transaction_test - # Patch the Reset method so that we don't actually call it during __init__. - with patch('notecard.notecard.OpenSerial.Reset'): - nCard = notecard.OpenSerial(port) - - return (nCard, port) - - -def get_i2c_and_port(): - periphery = Mock() # noqa: F811 - port = periphery.I2C("dev/i2c-foo") - port.try_lock.return_value = True - - # Patch the Reset method so that we don't actually call it during __init__. - with patch('notecard.notecard.OpenI2C.Reset'): - nCard = notecard.OpenI2C(port, 0x17, 255) - - return (nCard, port) - - -class NotecardTest: +class TestNotecard: + # _transaction_manager tests. + def test_txn_manager_is_no_op_before_pins_set(self): + card = notecard.Notecard() + + assert isinstance(card._transaction_manager, NoOpTransactionManager) + + def test_txn_manager_is_valid_after_pins_set(self): + card = notecard.Notecard() + with patch('notecard.notecard.TransactionManager', autospec=True): + card.SetTransactionPins(1, 2) + + assert isinstance(card._transaction_manager, TransactionManager) + + # _crc_add tests + def test_crc_add_adds_a_crc_field(self): + card = notecard.Notecard() + req = {'req': 'hub.status'} + + card._crc_add(req, 0) + + assert 'crc' in req + + def test_crc_add_formats_the_crc_field_correctly(self): + card = notecard.Notecard() + req = {'req': 'hub.status'} + seq_number = 37 + + card._crc_add(req, seq_number) + + # The format should be SSSS:CCCCCCCC, where S and C are hex digits + # comprising the sequence number and CRC32, respectively. + pattern = r'^[0-9A-Fa-f]{4}:[0-9A-Fa-f]{8}$' + assert re.match(pattern, req['crc']) + + # _crc_error tests. + @pytest.mark.parametrize('crc_supported', [False, True]) + def test_crc_error_handles_lack_of_crc_field_correctly(self, crc_supported): + card = notecard.Notecard() + card._card_supports_crc = crc_supported + rsp_bytes = b'{}\r\n' + + error = card._crc_error(rsp_bytes) + + assert error == crc_supported + + def test_crc_error_returns_error_if_sequence_number_wrong(self): + card = notecard.Notecard() + seq_number = 37 + card._last_request_seq_number = seq_number + # Sequence number should be 37 (0x25), but the response has 38 (0x26). + rsp_bytes = b'{"crc":"0026:A3A6BF43"}\r\n' + + error = card._crc_error(rsp_bytes) + + assert error + + def test_crc_error_returns_error_if_crc_wrong(self): + card = notecard.Notecard() + seq_number = 37 + card._last_request_seq_number = seq_number + # CRC should be A3A6BF43. + rsp_bytes = b'{"crc":"0025:A3A6BF44"}\r\n' + + error = card._crc_error(rsp_bytes) + + assert error + + @pytest.mark.parametrize( + 'rsp_bytes', + [ + # Without CRC, the response is {}. + b'{"crc":"0025:A3A6BF43"}\r\n', + # Without CRC, the response is {"connected": true}. This makes sure + # _crc_error handles the "," between the two fields properly. + b'{"connected": true,"crc": "0025:025A2457"}\r\n' + ] + ) + def test_crc_error_returns_no_error_if_sequence_number_and_crc_ok( + self, rsp_bytes): + card = notecard.Notecard() + seq_number = 37 + card._last_request_seq_number = seq_number + + error = card._crc_error(rsp_bytes) + + assert not error + + # Transaction tests. + def arrange_transaction_test(self): + card = notecard.Notecard() + card.Reset = MagicMock() + card.lock = MagicMock() + card.unlock = MagicMock() + card._transact = MagicMock(return_value=b'{}\r\n') + card._crc_error = MagicMock(return_value=False) + + return card + + @pytest.mark.parametrize('reset_required', [False, True]) + def test_transaction_calls_reset_if_needed( + self, arrange_transaction_test, reset_required): + card = arrange_transaction_test() + card._reset_required = reset_required + req = {"req": "hub.status"} + + card.Transaction(req) + + if reset_required: + card.Reset.assert_called_once() + else: + card.Reset.assert_not_called() + + @pytest.mark.parametrize('lock', [False, True]) + def test_transaction_handles_locking_correctly( + self, arrange_transaction_test, lock): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + card.Transaction(req, lock=lock) + + if lock: + card.lock.assert_called_once() + card.unlock.assert_called_once() + else: + card.lock.assert_not_called() + card.unlock.assert_not_called() + + @pytest.mark.parametrize('lock', [False, True]) + def test_transaction_handles_locking_after_exception_correctly( + self, arrange_transaction_test, lock): + card = arrange_transaction_test() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises(Exception, match='Failed to transact with Notecard.'): + card.Transaction(req, lock=lock) + + if lock: + card.lock.assert_called_once() + card.unlock.assert_called_once() + else: + card.lock.assert_not_called() + card.unlock.assert_not_called() + + def test_transaction_calls_txn_manager_start_and_stop( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transaction_manager = MagicMock() + req = {"req": "hub.status"} + + card.Transaction(req) + + card._transaction_manager.start.assert_called_once() + card._transaction_manager.stop.assert_called_once() + + def test_transaction_calls_txn_manager_stop_after_exception( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transaction_manager = MagicMock() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + card._transaction_manager.start.assert_called_once() + card._transaction_manager.stop.assert_called_once() + + def test_transaction_calls_reset_if_transact_fails( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._reset_required = False + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + card.Reset.assert_called() + + def test_transaction_retries_on_transact_error( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_retries_on_crc_error( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._crc_error.return_value = True + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_retries_on_failure_to_parse_json_response( + self, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + with patch('notecard.notecard.json.loads', + side_effect=Exception('json.loads failed.')): + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_retries_on_io_error_in_response( + self, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + with patch('notecard.notecard.json.loads', + return_value={'err': 'some {io} error'}): + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == \ + notecard.CARD_TRANSACTION_RETRIES + + def test_transaction_does_not_retry_on_bad_bin_error_in_response( + self, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + + with patch('notecard.notecard.json.loads', + return_value={'err': 'a {bad-bin} error'}): + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._transact.call_count == 1 + + @pytest.mark.parametrize( + 'rsp_expected,return_type', + [ + (False, type(None)), + (True, dict) + ] + ) + def test_transaction_returns_proper_type( + self, rsp_expected, return_type, arrange_transaction_test): + card = arrange_transaction_test() + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock( + return_value=(req_bytes, rsp_expected)) + + rsp_json = card.Transaction(req) + + assert isinstance(rsp_json, return_type) + + def test_transaction_does_not_retry_if_transact_fails_and_no_response_expected( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock(return_value=(req_bytes, False)) + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + card._transact.assert_called_once() + + @pytest.mark.parametrize('rsp_expected', [False, True]) + def test_transaction_increments_sequence_number_on_success( + self, rsp_expected, arrange_transaction_test): + card = arrange_transaction_test() + seq_number_before = card._last_request_seq_number + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock( + return_value=(req_bytes, rsp_expected)) + + card.Transaction(req) + + seq_number_after = card._last_request_seq_number + assert seq_number_after == seq_number_before + 1 + + @pytest.mark.parametrize('rsp_expected', [False, True]) + def test_transaction_increments_sequence_number_after_exception( + self, rsp_expected, arrange_transaction_test): + card = arrange_transaction_test() + seq_number_before = card._last_request_seq_number + req = {"req": "hub.status"} + req_bytes = json.dumps(req).encode('utf-8') + card._prepare_request = MagicMock( + return_value=(req_bytes, rsp_expected)) + card._transact.side_effect = Exception('_transact failed.') + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + seq_number_after = card._last_request_seq_number + assert seq_number_after == seq_number_before + 1 + + def test_transaction_queues_up_a_reset_on_error( + self, arrange_transaction_test): + card = arrange_transaction_test() + card._reset_required = False + card._transact.side_effect = Exception('_transact failed.') + req = {"req": "hub.status"} + + with pytest.raises( + Exception, match='Failed to transact with Notecard.'): + card.Transaction(req) + + assert card._reset_required + + # Command tests. + def test_command_returns_none(self): + card = notecard.Notecard() + card.Transaction = MagicMock() + + rsp = card.Command({'cmd': 'card.sleep'}) + + # A command generates no response, by definition. + assert rsp is None + + def test_command_fails_if_given_req(self): + card = notecard.Notecard() + + # Can't issue a command with 'req', must use 'cmd'. + with pytest.raises(Exception): + card.Command({'req': 'card.sleep'}) + + # UserAgentSent tests. + def test_user_agent_not_sent_before_hub_set(self): + card = notecard.Notecard() + + assert not card.UserAgentSent() + + @pytest.mark.parametrize( + 'request_method,request_key', + [ + ('Transaction', 'req'), + ('Command', 'cmd') + ] + ) + def test_user_agent_sent_after_hub_set(self, arrange_transaction_test, + request_method, request_key): + card = arrange_transaction_test() + + req = dict() + req[request_key] = 'hub.set' + method = getattr(card, request_method) + method(req) + + assert card.UserAgentSent() + + # GetUserAgent tests. def test_get_user_agent(self): - nCard, _ = self.get_port() - userAgent = nCard.GetUserAgent() + card = notecard.Notecard() + userAgent = card.GetUserAgent() assert userAgent['agent'] == 'note-python' assert userAgent['os_name'] is not None @@ -48,153 +413,22 @@ def test_get_user_agent(self): assert userAgent['os_version'] is not None assert userAgent['os_family'] is not None - def test_transaction(self): - nCard, port = self.get_port("{\"connected\":true}\r\n") - - response = nCard.Transaction({"req": "hub.status"}) - - assert "connected" in response - assert response["connected"] is True - - @patch('notecard.notecard.TransactionManager') - def test_setting_transaction_pins(self, transaction_manager_mock): - nCard, _ = self.get_port("{\"connected\":true}\r\n") - - nCard.SetTransactionPins(1, 2) - nCard.Transaction({"req": "hub.status"}) - - # If transaction pins have been set, start and stop should be called - # once for each Transaction call. - nCard._transaction_manager.start.assert_called_once() - nCard._transaction_manager.stop.assert_called_once() - - def test_command(self): - nCard, port = self.get_port() - - response = nCard.Command({"cmd": "card.sleep"}) - - assert response is None - - def test_command_fail_if_req(self): - nCard, port = self.get_port() - - with pytest.raises(Exception, - match="Please use 'cmd' instead of 'req'"): - nCard.Command({"req": "card.sleep"}) - - def test_user_agent_sent_is_false_before_hub_set(self): - nCard, _ = self.get_port() - - assert nCard.UserAgentSent() is False - - def test_send_user_agent_in_hub_set_transaction(self): - nCard, port = self.get_port("{\"connected\":true}\r\n") - - nCard.Transaction({"req": "hub.set"}) - - assert nCard.UserAgentSent() is True - - def get_port(self, response=None): - raise NotImplementedError("subclasses must implement `get_port()`") - - -class TestNotecardMockSerial(NotecardTest): - - def get_port(self, response=None): - nCard, port = get_serial_and_port() - if response is not None: - port.readline.return_value = response - return (nCard, port) - - def test_user_agent_is_serial_when_serial_used(self): - nCard, _ = self.get_port() - userAgent = nCard.GetUserAgent() - - assert userAgent['req_interface'] == 'serial' - assert userAgent['req_port'] is not None - - def test_open_serial(self): - nCard, _ = get_serial_and_port() - - assert nCard.uart is not None - - def test_debug_mode_on_serial(self): - # Patch the Reset method so that we don't actually call it during - # __init__. - with patch('notecard.notecard.OpenSerial.Reset'): - port = MagicMock() - nCard = notecard.OpenSerial(port, debug=True) - - assert nCard._debug - - -class TestNotecardMockI2C(NotecardTest): - - def get_port(self, response=None): - nCard, port = get_i2c_and_port() - if response is not None: - chunklen = 0 - tosend = bytes(response, 'utf-8') - - def writeto_then_readfrom(addr, write, read): - nonlocal chunklen, tosend - read[0] = len(tosend) - read[1] = chunklen - read[2:2 + chunklen] = tosend[0:chunklen] - tosend = tosend[chunklen:] - chunklen = len(tosend) - - def transfer(addr, messages: periphery.I2C.Message): - if len(messages) == 2 and messages[1].read: - read = messages[1].data - writeto_then_readfrom(addr, messages[0].data, read) - - port.writeto_then_readfrom = writeto_then_readfrom - port.transfer = transfer - return (nCard, port) - - def test_open_i2c(self): - nCard, _ = get_i2c_and_port() - - assert nCard.i2c is not None - - def test_user_agent_is_i2c_when_i2c_used(self): - nCard, _ = self.get_port() - userAgent = nCard.GetUserAgent() - - assert userAgent['req_interface'] == 'i2c' - assert userAgent['req_port'] is not None - - def test_debug_mode_on_i2c(self): - periphery = Mock() # noqa: F811 - port = periphery.I2C("dev/i2c-foo") - port.try_lock.return_value = True - - nCard = notecard.OpenI2C(port, 0x17, 255, debug=True) - - assert nCard._debug - - -class MockNotecard(notecard.Notecard): - - def Reset(self): - pass - - -class TestUserAgent: - - def setUserAgentInfo(self, info=None): - nCard = MockNotecard() - orgReq = {"req": "hub.set"} - nCard.SetAppUserAgent(info) - req = json.loads(nCard._prepare_request(orgReq)) + # SetAppUserAgent tests. + def set_user_agent_info(self, info=None): + card = notecard.Notecard() + req = {"req": "hub.set"} + card.SetAppUserAgent(info) + req = json.loads(card._prepare_request(req)[0]) return req - def test_amends_hub_set_request(self): - req = self.setUserAgentInfo() + def test_set_app_user_agent_amends_hub_set_request(self): + req = self.set_user_agent_info() + assert req['body'] is not None - def test_adds_app_info(self): + def test_set_app_user_agent_adds_app_info_to_hub_set_request(self): info = {"app": "myapp"} - req = self.setUserAgentInfo(info) + + req = self.set_user_agent_info(info) + assert req['body']['app'] == 'myapp' diff --git a/test/test_serial.py b/test/test_serial.py new file mode 100644 index 0000000..5f6b1e1 --- /dev/null +++ b/test/test_serial.py @@ -0,0 +1,341 @@ +import os +import sys +import pytest +from unittest.mock import MagicMock, patch +from filelock import FileLock +from contextlib import AbstractContextManager +from .unit_test_utils import TrueOnNthIteration, BooleanToggle + +sys.path.insert(0, + os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import notecard # noqa: E402 +from notecard import NoOpSerialLock, NoOpContextManager # noqa: E402 + + +@pytest.fixture +def arrange_test(): + def _arrange_test(debug=False): + # OpenSerial's __init__ will call Reset, which we don't care about + # actually doing here, so we mock Reset. + with patch('notecard.notecard.OpenSerial.Reset'): + card = notecard.OpenSerial(MagicMock(), debug=debug) + + return card + + # Mocking time.sleep makes the tests run faster because no actual sleeping + # occurs. + with patch('notecard.notecard.time.sleep'): + # Yield instead of return so that the time.sleep patch is active for the + # duration of the test. + yield _arrange_test + + +@pytest.fixture +def arrange_reset_test(arrange_test): + def _arrange_reset_test(): + card = arrange_test() + card.lock = MagicMock() + card.unlock = MagicMock() + card.uart.write = MagicMock() + + return card + + yield _arrange_reset_test + + +@pytest.fixture +def tramsit_test_data(): + # Create a bytearray to transmit. It should be larger than a single segment, + # and it should not fall neatly onto a segment boundary. + data_len = notecard.CARD_REQUEST_SEGMENT_MAX_LEN * 2 + 15 + data = bytearray(i % 256 for i in range(data_len)) + + return data + + +@pytest.fixture +def arrange_transact_test(arrange_test): + def _arrange_transact_test(): + card = arrange_test() + card.transmit = MagicMock() + card.receive = MagicMock() + req_bytes = card._prepare_request({'req': 'card.version'}) + + return card, req_bytes + + yield _arrange_transact_test + + +class TestSerial: + # Reset tests. + def test_reset_succeeds_on_good_notecard_response(self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=[True, True, False]) + card._read_byte = MagicMock(side_effect=[b'\r', b'\n', None]) + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + assert not card._reset_required + + def test_reset_sends_a_newline_to_clear_stale_response( + self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=[True, True, False]) + card._read_byte = MagicMock(side_effect=[b'\r', b'\n', None]) + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card.uart.write.assert_called_once_with(b'\n') + + def test_reset_locks_and_unlocks(self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=[True, True, False]) + card._read_byte = MagicMock(side_effect=[b'\r', b'\n', None]) + + with patch('notecard.notecard.has_timed_out', + side_effect=TrueOnNthIteration(2)): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_unlocks_after_exception(self, arrange_reset_test): + card = arrange_reset_test() + card.uart.write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + card.lock.assert_called_once() + card.unlock.assert_called_once() + + def test_reset_fails_if_continually_reads_non_control_chars( + self, arrange_reset_test): + card = arrange_reset_test() + card._available = MagicMock(side_effect=BooleanToggle(True)) + card._read_byte = MagicMock(return_value=b'h') + + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + def test_reset_required_if_reset_fails(self, arrange_reset_test): + card = arrange_reset_test() + card.uart.write.side_effect = Exception('write failed.') + + with pytest.raises(Exception, match='Failed to reset Notecard.'): + card.Reset() + + assert card._reset_required + + # __init__ tests. + @patch('notecard.notecard.OpenSerial.Reset') + def test_init_calls_reset(self, reset_mock): + notecard.OpenSerial(MagicMock()) + + reset_mock.assert_called_once() + + @pytest.mark.parametrize( + 'use_serial_lock,lock_type', + [ + (False, NoOpSerialLock), + (True, FileLock) + ] + ) + def test_init_creates_appropriate_lock_type( + self, use_serial_lock, lock_type, arrange_test): + with patch('notecard.notecard.use_serial_lock', new=use_serial_lock): + card = arrange_test() + + assert isinstance(card.lock_handle, lock_type) + + @pytest.mark.parametrize( + 'platform,available_method', + [ + ('micropython', notecard.OpenSerial._available_micropython), + ('cpython', notecard.OpenSerial._available_default), + ('circuitpython', notecard.OpenSerial._available_default), + ] + ) + def test_available_method_is_set_correctly_on_init( + self, platform, available_method, arrange_test): + with patch('notecard.notecard.sys.implementation.name', new=platform): + card = arrange_test() + + assert card._available.__func__ == available_method + + @pytest.mark.parametrize('debug', [False, True]) + def test_debug_set_correctly_on_init(self, debug, arrange_test): + card = arrange_test(debug) + + assert card._debug == debug + + def test_user_agent_indicates_serial_after_init(self, arrange_test): + card = arrange_test() + userAgent = card.GetUserAgent() + + assert userAgent['req_interface'] == 'serial' + assert userAgent['req_port'] is not None + + # _transact tests. + def test_transact_calls_transmit_with_req_bytes( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + card._transact(req_bytes, rsp_expected=False) + + card.transmit.assert_called_once_with(req_bytes) + + def test_transact_returns_none_if_rsp_not_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + + rsp = card._transact(req_bytes, rsp_expected=False) + + assert rsp is None + + def test_transact_returns_not_none_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._available = MagicMock(return_value=True) + + rsp = card._transact(req_bytes, rsp_expected=True) + + assert rsp is not None + + def test_transact_calls_receive_if_rsp_expected( + self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._available = MagicMock(return_value=True) + + card._transact(req_bytes, rsp_expected=True) + + card.receive.assert_called_once() + + def test_transact_raises_exception_on_timeout(self, arrange_transact_test): + card, req_bytes = arrange_transact_test() + card._available = MagicMock(return_value=False) + + # Force a timeout. + with patch('notecard.notecard.has_timed_out', + side_effect=BooleanToggle(False)): + with pytest.raises(Exception, + match=('Timed out while querying Notecard for ' + 'available data.')): + card._transact(req_bytes, rsp_expected=True) + + # transmit tests. + def test_transmit_writes_all_data_bytes( + self, arrange_test, tramsit_test_data): + card = arrange_test() + card.uart.write = MagicMock() + + card.transmit(tramsit_test_data, True) + + # Using the argument history of the uart.write mock, assemble a + # bytearray of the data passed to uart.write. + written = bytearray() + for write_call in card.uart.write.call_args_list: + segment = write_call[0][0] + written += segment + # Verify that all the data we passed to transmit was in fact passed to + # uart.write. + assert tramsit_test_data == written + + def test_transmit_does_not_exceed_max_segment_length( + self, arrange_test, tramsit_test_data): + card = arrange_test() + card.uart.write = MagicMock() + + card.transmit(tramsit_test_data) + + for write_call in card.uart.write.call_args_list: + segment = write_call.args[0] + assert len(segment) <= notecard.CARD_REQUEST_SEGMENT_MAX_LEN + + # receive tests. + def test_receive_raises_exception_on_timeout(self, arrange_test): + card = arrange_test() + card._available = MagicMock(return_value=False) + + # Force a timeout. + with patch('notecard.notecard.has_timed_out', + side_effect=[False, True]): + with pytest.raises(Exception, match=('Timed out waiting to receive ' + 'data from Notecard.')): + card.receive() + + def test_receive_returns_all_bytes_from_read_byte( + self, arrange_test): + card = arrange_test() + read_byte_mock = MagicMock() + read_byte_mock.side_effect = [b'{', b'}', b'\r', b'\n'] + card._read_byte = read_byte_mock + card._available = MagicMock(return_value=True) + expected_data = bytearray('{}\r\n'.encode('utf-8')) + + data = card.receive() + + # Verify that all the bytes returned by _read_byte were returned as a + # bytearray by receive. + assert data == expected_data + + # _read_byte tests. + def test_read_byte_calls_uart_read(self, arrange_test): + card = arrange_test() + card.uart.read = MagicMock() + + card._read_byte() + + card.uart.read.assert_called_once_with(1) + + # NoOpSerialLock tests. + def test_no_op_serial_lock_implements_acquire_and_release(self): + no_op_lock = NoOpSerialLock() + + assert hasattr(no_op_lock, 'acquire') + assert hasattr(no_op_lock, 'release') + + def test_no_op_serial_lock_acquire_returns_no_op_context_manager(self): + no_op_lock = NoOpSerialLock() + + assert isinstance(no_op_lock.acquire(), NoOpContextManager) + + def test_no_op_serial_lock_acquire_accepts_timeout_arg(self): + no_op_lock = NoOpSerialLock() + + no_op_lock.acquire(timeout=10) + + # NoOpContextManager tests. + def test_no_op_context_manager_is_a_context_manager(self): + manager = NoOpContextManager() + + with manager: + pass + + assert isinstance(manager, AbstractContextManager) + + # lock/unlock tests. + def test_lock_calls_acquire_on_underlying_lock(self, arrange_test): + card = arrange_test() + lock_handle_mock = MagicMock() + card.lock_handle = lock_handle_mock + + card.lock() + + lock_handle_mock.acquire.assert_called_once() + + def test_unlock_calls_release_on_underlying_lock(self, arrange_test): + card = arrange_test() + lock_handle_mock = MagicMock() + card.lock_handle = lock_handle_mock + + card.unlock() + + lock_handle_mock.release.assert_called_once() diff --git a/test/unit_test_utils.py b/test/unit_test_utils.py new file mode 100644 index 0000000..9bc8ad1 --- /dev/null +++ b/test/unit_test_utils.py @@ -0,0 +1,40 @@ +class TrueOnNthIteration: + """Iterable that returns False until Nth iteration, then it returns True.""" + + def __init__(self, n): + """Set the iteration to return True on.""" + self.n = n + + def __iter__(self): + self.current = 1 + return self + + def __next__(self): + if self.current > self.n: + raise StopIteration + elif self.current == self.n: + result = True + else: + result = False + + self.current += 1 + + return result + + +class BooleanToggle: + """Iterable that returns a toggling boolean.""" + + def __init__(self, initial_value): + """Set the initial state (i.e. False or True).""" + self.initial_value = initial_value + + def __iter__(self): + self.current = self.initial_value + return self + + def __next__(self): + result = self.current + self.current = not self.current + + return result From 5058b984dc950e1bfed0ff7bd916f1111ec0dfee Mon Sep 17 00:00:00 2001 From: Hayden Roche Date: Sun, 1 Oct 2023 10:57:45 -0700 Subject: [PATCH 2/4] Fix a couple problems. - Fail example scripts if an exception is raised at any point. - Rework `_crc_add` to produce a reliable CRC. This requires us to rely more on string manipulation and less on python dicts. Prior to this commit, the CRC was computed over the string representation of the request (via `json.dumps`) and then it was added to the request dict. Then, when it was time to serialize the request for transmission and we converted the dict to a string again, because the CRC field had been added, the order of the fields changed and the CRC was invalidated. This is because Python dicts are fundamentally unordered collections. - Add missing timeout reset to `OpenSerial`'s `receive` method. - Increase cpy_example.py UART RX buffer size. I was seeing the end of responses get cut off with the default size (64). Increased to 128. --- examples/notecard-basics/cpy_example.py | 42 ++++++------------------- examples/notecard-basics/mpy_example.py | 41 +++++------------------- notecard/notecard.py | 26 ++++++++++----- test/test_notecard.py | 14 +++++---- 4 files changed, 43 insertions(+), 80 deletions(-) diff --git a/examples/notecard-basics/cpy_example.py b/examples/notecard-basics/cpy_example.py index 5ab42ec..bf6db92 100644 --- a/examples/notecard-basics/cpy_example.py +++ b/examples/notecard-basics/cpy_example.py @@ -14,20 +14,6 @@ import busio # noqa: E402 -def NotecardExceptionInfo(exception): - """Construct a formatted Exception string. - - Args: - exception (Exception): An exception object. - - Returns: - string: a summary of the exception with line number and details. - """ - name = exception.__class__.__name__ - return sys.platform + ": " + name \ - + ": " + " ".join(map(str, exception.args)) - - def configure_notecard(card, product_uid): """Submit a simple JSON-based request to the Notecard. @@ -39,11 +25,7 @@ def configure_notecard(card, product_uid): req["product"] = product_uid req["mode"] = "continuous" - try: - card.Transaction(req) - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + card.Transaction(req) def get_temp_and_voltage(card): @@ -53,20 +35,13 @@ def get_temp_and_voltage(card): card (object): An instance of the Notecard class """ - temp = 0 - voltage = 0 - - try: - req = {"req": "card.temp"} - rsp = card.Transaction(req) - temp = rsp["value"] + req = {"req": "card.temp"} + rsp = card.Transaction(req) + temp = rsp["value"] - req = {"req": "card.voltage"} - rsp = card.Transaction(req) - voltage = rsp["value"] - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + req = {"req": "card.voltage"} + rsp = card.Transaction(req) + voltage = rsp["value"] return temp, voltage @@ -75,7 +50,8 @@ def run_example(product_uid, use_uart=True): """Connect to Notcard and run a transaction test.""" print("Opening port...") if use_uart: - port = busio.UART(board.TX, board.RX, baudrate=9600) + port = busio.UART(board.TX, board.RX, baudrate=9600, + receiver_buffer_size=128) else: port = busio.I2C(board.SCL, board.SDA) diff --git a/examples/notecard-basics/mpy_example.py b/examples/notecard-basics/mpy_example.py index ba0c68e..a1cee74 100644 --- a/examples/notecard-basics/mpy_example.py +++ b/examples/notecard-basics/mpy_example.py @@ -16,20 +16,6 @@ from machine import Pin -def NotecardExceptionInfo(exception): - """Construct a formatted Exception string. - - Args: - exception (Exception): An exception object. - - Returns: - string: a summary of the exception with line number and details. - """ - name = exception.__class__.__name__ - return sys.platform + ": " + name + ": " \ - + " ".join(map(str, exception.args)) - - def configure_notecard(card, product_uid): """Submit a simple JSON-based request to the Notecard. @@ -41,11 +27,7 @@ def configure_notecard(card, product_uid): req["product"] = product_uid req["mode"] = "continuous" - try: - card.Transaction(req) - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + card.Transaction(req) def get_temp_and_voltage(card): @@ -55,20 +37,13 @@ def get_temp_and_voltage(card): card (object): An instance of the Notecard class """ - temp = 0 - voltage = 0 - - try: - req = {"req": "card.temp"} - rsp = card.Transaction(req) - temp = rsp["value"] - - req = {"req": "card.voltage"} - rsp = card.Transaction(req) - voltage = rsp["value"] - except Exception as exception: - print("Transaction error: " + NotecardExceptionInfo(exception)) - time.sleep(5) + req = {"req": "card.temp"} + rsp = card.Transaction(req) + temp = rsp["value"] + + req = {"req": "card.voltage"} + rsp = card.Transaction(req) + voltage = rsp["value"] return temp, voltage diff --git a/notecard/notecard.py b/notecard/notecard.py index 0aa9e84..15fdeb9 100644 --- a/notecard/notecard.py +++ b/notecard/notecard.py @@ -129,7 +129,7 @@ def __init__(self, debug=False): self._card_supports_crc = False self._reset_required = True - def _crc_add(self, req, seq_number): + def _crc_add(self, req_string, seq_number): """Add a CRC field to the request. The CRC field also contains a sequence number and has this format: @@ -139,10 +139,18 @@ def _crc_add(self, req, seq_number): SSSS is the sequence number encoded as a string of 4 hex digits. CCCCCCCC is the CRC32 encoded as a string of 8 hex digits. """ - req_bytes = json.dumps(req, separators=(',', ':')).encode('utf-8') + req_bytes = req_string.encode('utf-8') crc_hex = '{:08x}'.format(crc32(req_bytes)) seq_number_hex = '{:04x}'.format(seq_number) - req['crc'] = f'{seq_number_hex}:{crc_hex}' + crc_field = f'"crc":"{seq_number_hex}:{crc_hex}"' + req_string_w_crc = req_string[:-1] + if req_string[-2] == '{': + req_string_w_crc += f'{crc_field}' + else: + req_string_w_crc += f',{crc_field}' + req_string_w_crc += '}' + + return req_string_w_crc def _crc_error(self, rsp_bytes): """Check the CRC in a Notecard response.""" @@ -199,12 +207,13 @@ def _prepare_request(self, req): rsp_expected = 'req' in req # If this is a request and not a command, add a CRC. + req_string = json.dumps(req, separators=(',', ':')) if rsp_expected: - self._crc_add(req, self._last_request_seq_number) + req_string = self._crc_add(req_string, + self._last_request_seq_number) # Serialize the JSON request to a string, removing any unnecessary # whitespace. - req_string = json.dumps(req, separators=(',', ':')) if self._debug: print(req_string) @@ -320,7 +329,7 @@ def Transaction(self, req, lock=True): if '{io}' in rsp_json['err']: if self._debug: print(('Response has error field indicating I/O' - ' error.')) + f' error: {rsp_json}')) error = True retries_left -= 1 @@ -329,8 +338,8 @@ def Transaction(self, req, lock=True): elif '{bad-bin}' in rsp_json['err']: if self._debug: print(('Response has error field indicating ' - 'binary I/O error. Not eligible for ' - 'retry.')) + f'binary I/O error: {rsp_json}')) + print('Not eligible for retry.') error = True break @@ -433,6 +442,7 @@ def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC, time.sleep(.001) timeout_secs = CARD_INTRA_TRANSACTION_TIMEOUT_SEC + start = start_timeout() byte = self._read_byte() data.extend(byte) received_newline = byte == b'\n' diff --git a/test/test_notecard.py b/test/test_notecard.py index 395e381..fcf8416 100644 --- a/test/test_notecard.py +++ b/test/test_notecard.py @@ -49,23 +49,25 @@ def test_txn_manager_is_valid_after_pins_set(self): # _crc_add tests def test_crc_add_adds_a_crc_field(self): card = notecard.Notecard() - req = {'req': 'hub.status'} + req = '{"req":"hub.status"}' - card._crc_add(req, 0) + req_string = card._crc_add(req, 0) - assert 'crc' in req + req_json = json.loads(req_string) + assert 'crc' in req_json def test_crc_add_formats_the_crc_field_correctly(self): card = notecard.Notecard() - req = {'req': 'hub.status'} + req = '{"req":"hub.status"}' seq_number = 37 - card._crc_add(req, seq_number) + req_string = card._crc_add(req, seq_number) + req_json = json.loads(req_string) # The format should be SSSS:CCCCCCCC, where S and C are hex digits # comprising the sequence number and CRC32, respectively. pattern = r'^[0-9A-Fa-f]{4}:[0-9A-Fa-f]{8}$' - assert re.match(pattern, req['crc']) + assert re.match(pattern, req_json['crc']) # _crc_error tests. @pytest.mark.parametrize('crc_supported', [False, True]) From 22885cefd3cefff08ebc54a3b9088ace989bf3d1 Mon Sep 17 00:00:00 2001 From: Hayden Roche Date: Tue, 3 Oct 2023 17:13:59 -0700 Subject: [PATCH 3/4] Address review feedback. --- notecard/notecard.py | 26 ++++++++--- test/test_i2c.py | 105 ++++++++++++++++++++++++++---------------- test/test_notecard.py | 2 +- test/test_serial.py | 10 ++++ 4 files changed, 96 insertions(+), 47 deletions(-) diff --git a/notecard/notecard.py b/notecard/notecard.py index 15fdeb9..1b15f3e 100644 --- a/notecard/notecard.py +++ b/notecard/notecard.py @@ -572,7 +572,12 @@ def __init__(self, uart_id, debug=False): if sys.implementation.name == 'micropython': self._available = self._available_micropython else: - self._available = self._available_default + if hasattr(self.uart, 'in_waiting'): + self._available = self._available_default + else: + raise NotImplementedError(('Serial communications with the ' + 'Notecard are not supported for this' + ' platform.')) self.Reset() @@ -604,7 +609,12 @@ def _read(self, length): # The rest is the data. data = read_buf[2:] - return (available, data_len, data) + if len(data) != data_len: + raise Exception(('Serial-over-I2C error: reported data length ' + f'({data_len}) differs from actual data length' + f' ({len(data)}).')) + + return available, data def _write(self, data): """Perform a serial-over-I2C write.""" @@ -620,8 +630,8 @@ def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC, received_data = bytearray() while True: - available, data_len, data = self._read(read_len) - if data_len > 0: + available, data = self._read(read_len) + if len(data) > 0: received_data += data timeout_secs = CARD_INTRA_TRANSACTION_TIMEOUT_SEC @@ -670,6 +680,8 @@ def transmit(self, data, delay=True): data_left -= chunk_len sent_in_seg += chunk_len + # We delay for CARD_REQUEST_SEGMENT_DELAY_MS ms every time a full + # "segment" of data has been transmitted. if sent_in_seg > CARD_REQUEST_SEGMENT_MAX_LEN: sent_in_seg -= CARD_REQUEST_SEGMENT_MAX_LEN @@ -693,7 +705,7 @@ def _transact(self, req_bytes, rsp_expected, start = start_timeout() available = 0 while available == 0: - available, _, _ = self._read(0) + available, _ = self._read(0) if timeout_secs != 0 and has_timed_out(start, timeout_secs): raise Exception(('Timed out while querying Notecard for ' @@ -730,14 +742,14 @@ def Reset(self): read_len = 0 while not has_timed_out(start, CARD_RESET_DRAIN_MS / 1000): try: - available, data_len, data = self._read(read_len) + available, data = self._read(read_len) except Exception as e: if self._debug: print(e) time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) continue - if data_len > 0: + if len(data) > 0: something_found = True # The Notecard responds to a bare `\n` with `\r\n`. If # we get any other characters back, it means the host diff --git a/test/test_i2c.py b/test/test_i2c.py index 144fc95..acbd46f 100644 --- a/test/test_i2c.py +++ b/test/test_i2c.py @@ -1,6 +1,7 @@ import os import sys import pytest +import re from unittest.mock import MagicMock, patch from .unit_test_utils import TrueOnNthIteration, BooleanToggle @@ -59,12 +60,29 @@ def _arrange_transact_test(): yield _arrange_transact_test +@pytest.fixture +def arrange_read_test(arrange_test): + def _arrange_read_test(available, data_len, data): + def _platform_read_side_effect(initiate_read_msg, read_buf): + read_buf[0] = available + read_buf[1] = data_len + read_buf[2:] = data + + card = arrange_test() + card._platform_read = MagicMock( + side_effect=_platform_read_side_effect) + + return card + + yield _arrange_read_test + + class TestI2C: # Reset tests. def test_reset_succeeds_on_good_notecard_response( self, arrange_reset_test): card = arrange_reset_test() - card._read.return_value = (0, 2, b'\r\n') + card._read.return_value = (0, b'\r\n') with patch('notecard.notecard.has_timed_out', side_effect=TrueOnNthIteration(2)): @@ -75,7 +93,7 @@ def test_reset_succeeds_on_good_notecard_response( def test_reset_sends_a_newline_to_clear_stale_response( self, arrange_reset_test): card = arrange_reset_test() - card._read.return_value = (0, 2, b'\r\n') + card._read.return_value = (0, b'\r\n') with patch('notecard.notecard.has_timed_out', side_effect=TrueOnNthIteration(2)): @@ -85,7 +103,7 @@ def test_reset_sends_a_newline_to_clear_stale_response( def test_reset_locks_and_unlocks(self, arrange_reset_test): card = arrange_reset_test() - card._read.return_value = (0, 2, b'\r\n') + card._read.return_value = (0, b'\r\n') with patch('notecard.notecard.has_timed_out', side_effect=TrueOnNthIteration(2)): @@ -220,10 +238,10 @@ def test_receive_returns_all_data_bytes_from_read(self, arrange_test): card._read.side_effect = [ # There are 4 bytes available to read, and there are no more bytes # to read in this packet. - (4, 0, None), + (4, bytearray()), # 0 bytes available to read after this packet. 4 coming in this # packet, and they are {}\r\n. - (0, 4, payload) + (0, payload) ] rx_data = card.receive() @@ -239,13 +257,13 @@ def test_receive_keeps_reading_if_data_available_after_newline( card._read.side_effect = [ # There are 4 bytes available to read, and there are no more bytes # to read in this packet. - (4, 0, None), + (4, bytearray()), # 2 bytes available to read after this packet. 4 coming in this # packet, and they are {}\r\n. - (2, 4, payload), + (2, payload), # 0 bytes after this packet. 2 coming in this packet, and they are # io. - (0, 2, excess_data) + (0, excess_data) ] rx_data = card.receive() @@ -259,11 +277,11 @@ def test_receive_raises_exception_on_timeout(self, arrange_test): card._read.side_effect = [ # There are 3 bytes available to read, and there are no more bytes # to read in this packet. - (4, 0, None), + (3, bytearray()), # 0 bytes available to read after this packet. 3 coming in this # packet, and they are {}\r. The lack of a newline at the end will # cause this test to hit the timeout. - (0, 3, payload) + (0, payload) ] with patch('notecard.notecard.has_timed_out', return_value=True): @@ -330,7 +348,7 @@ def test_transact_returns_none_if_rsp_not_expected( def test_transact_returns_not_none_if_rsp_expected( self, arrange_transact_test): card, req_bytes = arrange_transact_test() - card._read = MagicMock(return_value=(1, None, None)) + card._read = MagicMock(return_value=(1, bytearray())) rsp = card._transact(req_bytes, rsp_expected=True) @@ -339,7 +357,7 @@ def test_transact_returns_not_none_if_rsp_expected( def test_transact_calls_receive_if_rsp_expected( self, arrange_transact_test): card, req_bytes = arrange_transact_test() - card._read = MagicMock(return_value=(1, None, None)) + card._read = MagicMock(return_value=(1, bytearray())) card._transact(req_bytes, rsp_expected=True) @@ -347,7 +365,7 @@ def test_transact_calls_receive_if_rsp_expected( def test_transact_raises_exception_on_timeout(self, arrange_transact_test): card, req_bytes = arrange_transact_test() - card._read = MagicMock(return_value=(0, None, None)) + card._read = MagicMock(return_value=(0, bytearray())) # Force a timeout. with patch('notecard.notecard.has_timed_out', @@ -357,55 +375,64 @@ def test_transact_raises_exception_on_timeout(self, arrange_transact_test): 'available data.')): card._transact(req_bytes, rsp_expected=True) - # _read tests - def test_read_sends_the_initial_read_packet_correctly(self, arrange_test): - card = arrange_test() - card._platform_read = MagicMock() - length = 12 + # _read tests. + def test_read_sends_the_initial_read_packet_correctly( + self, arrange_read_test): + data_len = 4 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(0, data_len, data) # To start a read from the Notecard using serial-over-I2C, the host # should send a 0 byte followed by a byte with the requested read # length. expected_packet = bytearray(2) expected_packet[0] = 0 - expected_packet[1] = length + expected_packet[1] = data_len - card._read(length) + card._read(data_len) card._platform_read.assert_called_once() assert card._platform_read.call_args[0][0] == expected_packet - def test_read_sizes_read_buf_correctly(self, arrange_test): - card = arrange_test() - card._platform_read = MagicMock() - header_length = 2 - data_length = 12 - expected_read_buffer_length = header_length + data_length + def test_read_sizes_read_buf_correctly(self, arrange_read_test): + data_len = 4 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(0, data_len, data) + header_len = 2 + expected_read_buffer_len = header_len + data_len - card._read(data_length) + card._read(data_len) card._platform_read.assert_called_once() - assert card._platform_read.call_args[0][1] == bytearray(expected_read_buffer_length) + assert len(card._platform_read.call_args[0][1]) == \ + expected_read_buffer_len - def test_read_parses_data_correctly(self, arrange_test): + def test_read_parses_data_correctly(self, arrange_read_test): available = 8 data_len = 4 data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(available, data_len, data) - def platform_read_side_effect(initiate_read_msg, read_buf): - read_buf[0] = available - read_buf[1] = data_len - read_buf[2:] = data - - card = arrange_test() - card._platform_read = MagicMock(side_effect=platform_read_side_effect) - - actual_available, actual_data_len, actual_data = card._read(len(data)) + actual_available, actual_data = card._read(len(data)) card._platform_read.assert_called_once() assert actual_available == available - assert actual_data_len == data_len assert actual_data == data + def test_read_raises_exception_if_data_length_does_not_match_data( + self, arrange_read_test): + available = 8 + # The reported length is 5, but the actual length is 4. + data_len = 5 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(available, data_len, data) + + exception_msg = re.escape(('Serial-over-I2C error: reported data length' + f' ({data_len}) differs from actual data ' + f'length ({len(data)}).')) + with pytest.raises(Exception, match=exception_msg): + card._read(len(data)) + + # _write tests. def test_write_calls_platform_write_correctly(self, arrange_test): card = arrange_test() card._platform_write = MagicMock() diff --git a/test/test_notecard.py b/test/test_notecard.py index fcf8416..4cdbb8d 100644 --- a/test/test_notecard.py +++ b/test/test_notecard.py @@ -368,7 +368,7 @@ def test_command_returns_none(self): card = notecard.Notecard() card.Transaction = MagicMock() - rsp = card.Command({'cmd': 'card.sleep'}) + rsp = card.Command({'cmd': 'hub.set'}) # A command generates no response, by definition. assert rsp is None diff --git a/test/test_serial.py b/test/test_serial.py index 5f6b1e1..198e47e 100644 --- a/test/test_serial.py +++ b/test/test_serial.py @@ -155,6 +155,16 @@ def test_init_creates_appropriate_lock_type( assert isinstance(card.lock_handle, lock_type) + def test_init_fails_if_not_micropython_and_uart_has_no_in_waiting_attr( + self): + exception_msg = ('Serial communications with the Notecard are not ' + 'supported for this platform.') + + with patch('notecard.notecard.sys.implementation.name', new='cpython'): + with patch('notecard.notecard.OpenSerial.Reset'): + with pytest.raises(Exception, match=exception_msg): + notecard.OpenSerial(42) + @pytest.mark.parametrize( 'platform,available_method', [ From 1f783b87a9c8b9ea8918e2d6c31b19a187258986 Mon Sep 17 00:00:00 2001 From: Hayden Roche Date: Tue, 17 Oct 2023 14:03:08 -0700 Subject: [PATCH 4/4] Fix issue with multiline f-string concatenation in Micro|CircuitPython. See https://forum.micropython.org/viewtopic.php?f=2&t=11114. I also went ahead and changed to the new format for multiline strings that don't contain f-strings. --- Makefile | 2 +- notecard/notecard.py | 60 ++++++++++++++++++++++---------------------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/Makefile b/Makefile index be6237e..5ddaf78 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ flake8: # F403 'from module import *' used; unable to detect undefined names https://www.flake8rules.com/rules/F403.html # W503 Line break occurred before a binary operator https://www.flake8rules.com/rules/W503.html # E501 Line too long (>79 characters) https://www.flake8rules.com/rules/E501.html - ${PYTHON} -m flake8 test/ notecard/ examples/ mpy_board/ --count --ignore=E722,F401,F403,W503,E501 --show-source --statistics + ${PYTHON} -m flake8 test/ notecard/ examples/ mpy_board/ --count --ignore=E722,F401,F403,W503,E501,E502 --show-source --statistics coverage: ${RUN_VENV_ACTIVATE} diff --git a/notecard/notecard.py b/notecard/notecard.py index 1b15f3e..357a3e0 100644 --- a/notecard/notecard.py +++ b/notecard/notecard.py @@ -184,8 +184,8 @@ def _crc_error(self, rsp_bytes): if seq_number != expected_seq_number: if self._debug: - print(('Sequence number mismatch. Expected ' - f'{expected_seq_number}, received {seq_number}.')) + print('Sequence number mismatch. Expected ' + \ + f'{expected_seq_number}, received {seq_number}.') return True elif crc != computed_crc: if self._debug: @@ -245,8 +245,8 @@ def _transaction_timeout_seconds(self, req): elif 'cmd' in req: req_key = 'cmd' else: - raise Exception(('Malformed request. Missing \'req\' or \'cmd\' ' - f'field: {req}.')) + raise Exception('Malformed request. Missing \'req\' or \'cmd\' ' + \ + f'field: {req}.') if req[req_key] == 'note.add': if 'milliseconds' in req: @@ -328,8 +328,8 @@ def Transaction(self, req, lock=True): if 'err' in rsp_json: if '{io}' in rsp_json['err']: if self._debug: - print(('Response has error field indicating I/O' - f' error: {rsp_json}')) + print('Response has error field indicating ' + \ + f'I/O error: {rsp_json}') error = True retries_left -= 1 @@ -337,8 +337,8 @@ def Transaction(self, req, lock=True): continue elif '{bad-bin}' in rsp_json['err']: if self._debug: - print(('Response has error field indicating ' - f'binary I/O error: {rsp_json}')) + print('Response has error field indicating ' + \ + f'binary I/O error: {rsp_json}') print('Not eligible for retry.') error = True @@ -415,8 +415,8 @@ def _transact(self, req_bytes, rsp_expected, start = start_timeout() while not self._available(): if timeout_secs != 0 and has_timed_out(start, timeout_secs): - raise Exception(('Timed out while querying Notecard for ' - 'available data.')) + raise Exception('Timed out while querying Notecard for ' + \ + 'available data.') # Delay for 10 ms before checking for available data again. time.sleep(.01) @@ -433,8 +433,8 @@ def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC, while not received_newline: while not self._available(): if timeout_secs != 0 and has_timed_out(start, timeout_secs): - raise Exception(('Timed out waiting to receive data from ' - 'Notecard.')) + raise Exception('Timed out waiting to receive data from' + \ + ' Notecard.') # Sleep while awaiting the first byte (lazy). After the first # byte, start to spin for the remaining bytes (greedy). @@ -521,13 +521,13 @@ def Reset(self): if not something_found: if self._debug: - print(('Notecard not responding to newline during ' - 'reset.')) + print('Notecard not responding to newline during ' + \ + 'reset.') elif non_control_char_found: if self._debug: - print(('Received non-control characters from the ' - 'Notecard during reset.')) + print('Received non-control characters from the ' + \ + 'Notecard during reset.') else: # If all we got back is newlines, we're in sync with the # Notecard. @@ -575,9 +575,9 @@ def __init__(self, uart_id, debug=False): if hasattr(self.uart, 'in_waiting'): self._available = self._available_default else: - raise NotImplementedError(('Serial communications with the ' - 'Notecard are not supported for this' - ' platform.')) + raise NotImplementedError('Serial communications with the ' + \ + 'Notecard are not supported for ' + \ + 'this platform.') self.Reset() @@ -610,9 +610,9 @@ def _read(self, length): data = read_buf[2:] if len(data) != data_len: - raise Exception(('Serial-over-I2C error: reported data length ' - f'({data_len}) differs from actual data length' - f' ({len(data)}).')) + raise Exception('Serial-over-I2C error: reported data length ' + \ + f'({data_len}) differs from actual data length' + \ + f' ({len(data)}).') return available, data @@ -653,8 +653,8 @@ def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC, break if timeout_secs != 0 and has_timed_out(start, timeout_secs): - raise Exception(('Timed out while reading data from the ' - 'Notecard.')) + raise Exception('Timed out while reading data from the ' + \ + 'Notecard.') if delay: time.sleep(0.05) @@ -708,8 +708,8 @@ def _transact(self, req_bytes, rsp_expected, available, _ = self._read(0) if timeout_secs != 0 and has_timed_out(start, timeout_secs): - raise Exception(('Timed out while querying Notecard for ' - 'available data.')) + raise Exception('Timed out while querying Notecard for ' + \ + 'available data.') return self.receive() @@ -768,13 +768,13 @@ def Reset(self): if not something_found: if self._debug: - print(('Notecard not responding to newline during ' - 'reset.')) + print('Notecard not responding to newline during ' + \ + 'reset.') time.sleep(.005) elif non_control_char_found: if self._debug: - print(('Received non-control characters from the ' - 'Notecard during reset.')) + print('Received non-control characters from the ' + \ + 'Notecard during reset.') else: # If all we got back is newlines, we're in sync with the # Notecard.