Skip to content

Commit

Permalink
lint: add almost all annotations to pass mypy --strict
Browse files Browse the repository at this point in the history
One remains, for a context manager's __exit__:

    adafruit_tinylora.py:240: error: Function is missing a type annotation for one or more arguments  [no-untyped-def]

Not sure how this should be annotated, but it's not annotated in
upstream cpython 3.12 either.

Added type aliases bytearray2, bytearray4, etc to signal that the
bytearrays passed must be 2 bytes, 4 bytes, etc.

In adafruit_tinylora_encryption.py, there's a "state" matrix for the AES
implementation there. It was mixing str and byte/int, which apparently
works fine on circuitpython but fails on cpython. Change the state
matrix to a bytearray, as was likely intended.

Fixes #50.
  • Loading branch information
samatjain committed Apr 24, 2023
1 parent 8f25b30 commit 7f4e154
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 63 deletions.
91 changes: 66 additions & 25 deletions adafruit_tinylora/adafruit_tinylora.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,29 @@

import time
from random import randint
from micropython import const

import adafruit_bus_device.spi_device
from micropython import const

from adafruit_tinylora.adafruit_tinylora_encryption import AES

try: # typing
from typing import Annotated, Optional, TypeAlias, Union

import busio.SPI
import digitalio.DigitalInOut
from typing_extensions import Self # Python <3.11

# type aliases
bytearray2: TypeAlias = Annotated[bytearray, 2]
bytearray4: TypeAlias = Annotated[bytearray, 4]
bytearray16: TypeAlias = Annotated[bytearray, 16]

registeraddress: TypeAlias = Union[const, int]
except ImportError:
pass


__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_TinyLoRa.git"

Expand Down Expand Up @@ -70,7 +89,13 @@
class TTN:
"""TTN Class"""

def __init__(self, dev_address, net_key, app_key, country="US"):
def __init__(
self,
dev_address: bytearray4,
net_key: bytearray16,
app_key: bytearray16,
country: str = "US",
):
"""Interface for TheThingsNetwork
:param bytearray dev_address: TTN Device Address.
:param bytearray net_key: TTN Network Key.
Expand All @@ -83,22 +108,22 @@ def __init__(self, dev_address, net_key, app_key, country="US"):
self.region = country

@property
def country(self):
def country(self) -> str:
"""Returns the TTN Frequency Country."""
return self.region

@property
def device_address(self):
def device_address(self) -> bytearray4:
"""Returns the TTN Device Address."""
return self.dev_addr

@property
def application_key(self):
def application_key(self) -> bytearray16:
"""Returns the TTN Application Key."""
return self.app_key

@property
def network_key(self):
def network_key(self) -> bytearray16:
"""Returns the TTN Network Key."""
return self.net_key

Expand All @@ -108,10 +133,18 @@ class TinyLoRa:
"""TinyLoRa Interface"""

# SPI Write Buffer
_BUFFER = bytearray(2)
_BUFFER: bytearray2 = bytearray(2)

# pylint: disable=too-many-arguments,invalid-name
def __init__(self, spi, cs, irq, rst, ttn_config, channel=None):
def __init__(
self,
spi: busio.SPI,
cs: digitalio.DigitalInOut,
irq: digitalio.DigitalInOut,
rst: digitalio.DigitalInOut,
ttn_config: digitalio.DigitalInOut,
channel: Optional[int] = None,
):
"""Interface for a HopeRF RFM95/6/7/8(w) radio module. Sets module up for sending to
The Things Network.
Expand Down Expand Up @@ -141,13 +174,13 @@ def __init__(self, spi, cs, irq, rst, ttn_config, channel=None):
if self._version != 18:
raise TypeError("Can not detect LoRa Module. Please check wiring!")
# Set Frequency registers
self._rfm_msb = None
self._rfm_mid = None
self._rfm_lsb = None
self._rfm_msb: Optional[registeraddress] = None
self._rfm_mid: Optional[registeraddress] = None
self._rfm_lsb: Optional[registeraddress] = None
# Set datarate registers
self._sf = None
self._bw = None
self._modemcfg = None
self._sf: Optional[registeraddress] = None
self._bw: Optional[registeraddress] = None
self._modemcfg: Optional[registeraddress] = None
self.set_datarate("SF7BW125")
# Set regional frequency plan
# pylint: disable=import-outside-toplevel
Expand Down Expand Up @@ -201,13 +234,13 @@ def __init__(self, spi, cs, irq, rst, ttn_config, channel=None):
# Give the lora object ttn configuration
self._ttn_config = ttn_config

def __enter__(self):
def __enter__(self) -> Self:
return self

def __exit__(self, exception_type, exception_value, traceback):
def __exit__(self, exception_type, exception_value, traceback) -> None:
self.deinit()

def deinit(self):
def deinit(self) -> None:
"""Deinitializes the TinyLoRa object properties and pins."""
self._irq = None
self._rst = None
Expand All @@ -220,7 +253,9 @@ def deinit(self):
self._bw = None
self._modemcfg = None

def send_data(self, data, data_length, frame_counter, timeout=2):
def send_data(
self, data: bytearray, data_length: int, frame_counter: int, timeout: int = 2
) -> None:
"""Function to assemble and send data
:param data: data to send
:param data_length: length of data to send
Expand Down Expand Up @@ -258,15 +293,15 @@ def send_data(self, data, data_length, frame_counter, timeout=2):
# recalculate packet length
lora_pkt_len += data_length
# Calculate MIC
mic = bytearray(4)
mic: bytearray4 = bytearray(4)
mic = aes.calculate_mic(lora_pkt, lora_pkt_len, mic)
# load mic in package
lora_pkt[lora_pkt_len : lora_pkt_len + 4] = mic[0:4]
# recalculate packet length (add MIC length)
lora_pkt_len += 4
self.send_packet(lora_pkt, lora_pkt_len, timeout)

def send_packet(self, lora_packet, packet_length, timeout):
def send_packet(self, lora_packet: bytearray, packet_length: int, timeout: int) -> None:
"""Sends a LoRa packet using the RFM Module
:param bytearray lora_packet: assembled LoRa packet from send_data
:param int packet_length: length of LoRa packet to send
Expand Down Expand Up @@ -312,10 +347,11 @@ def send_packet(self, lora_packet, packet_length, timeout):
if timed_out:
raise RuntimeError("Timeout during packet send")

def set_datarate(self, datarate):
def set_datarate(self, datarate: str) -> None:
"""Sets the RFM Datarate
:param datarate: Bandwidth and Frequency Plan
"""
# TODO: Convert these to enum
data_rates = {
"SF7BW125": (0x74, 0x72, 0x04),
"SF7BW250": (0x74, 0x82, 0x04),
Expand All @@ -330,13 +366,15 @@ def set_datarate(self, datarate):
except KeyError as err:
raise KeyError("Invalid or Unsupported Datarate.") from err

def set_channel(self, channel):
def set_channel(self, channel: int) -> None:
"""Sets the RFM Channel (if single-channel)
:param int channel: Transmit Channel (0 through 7).
"""
self._rfm_msb, self._rfm_mid, self._rfm_lsb = self._frequencies[channel]

def _read_into(self, address, buf, length=None):
def _read_into(
self, address: registeraddress, buf: bytearray2, length: Optional[int] = None
) -> None:
"""Read a number of bytes from the specified address into the
provided buffer. If length is not specified (default) the entire buffer
will be filled.
Expand All @@ -353,14 +391,14 @@ def _read_into(self, address, buf, length=None):
device.write(self._BUFFER, end=1)
device.readinto(buf, end=length)

def _read_u8(self, address):
def _read_u8(self, address: registeraddress) -> int:
"""Read a single byte from the provided address and return it.
:param bytearray address: Register Address.
"""
self._read_into(address, self._BUFFER, length=1)
return self._BUFFER[0]

def _write_u8(self, address, val):
def _write_u8(self, address: registeraddress, val: int) -> None:
"""Writes to the RFM register given an address and data.
:param bytearray address: Register Address.
:param val: Data to write.
Expand All @@ -370,3 +408,6 @@ def _write_u8(self, address, val):
self._BUFFER[1] = val
# pylint: disable=no-member
device.write(self._BUFFER, end=2)
# pylint: disable=no-member
device.write(self._BUFFER, end=2)
device.write(self._BUFFER, end=2)
Loading

0 comments on commit 7f4e154

Please sign in to comment.