Skip to content

Commit

Permalink
Add type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
Janos Kutscherauer committed Nov 21, 2021
1 parent 1f5da6b commit 8754a5f
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 49 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 codecov
python -m pip install flake8 codecov mypy
pip install -r tests/requirements.txt
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 .
- name: Typecheck with mypy
run: |
mypy -p ina219 --strict
- name: Test with codecov
run: |
coverage run --branch --source=ina219 -m unittest discover -s tests -p 'test_*.py'
Expand Down
8 changes: 5 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ python:
# command to install dependencies
install:
- pip install flake8
- pip install mypy
- pip install codecov
- pip install -r tests/requirements.txt
# check PEP8 coding standard with flake8
# check PEP8 coding standard with flake8 and typing with mypy
before_script:
flake8 .
- flake8 .
- mypy -p ina219 --strict
# command to run tests
script:
script:
- coverage run --branch --source=ina219 -m unittest discover -s tests -p 'test_*.py'
- coverage xml
after_success:
Expand Down
98 changes: 53 additions & 45 deletions ina219.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import logging
import time
from math import trunc
import Adafruit_GPIO.I2C as I2C
from typing import cast, List, Optional
import Adafruit_GPIO.I2C as I2C # type: ignore


class INA219:
Expand Down Expand Up @@ -90,9 +91,10 @@ class INA219:
# to guarantee that current overflow can always be detected.
__CURRENT_LSB_FACTOR = 32800

def __init__(self, shunt_ohms, max_expected_amps=None,
busnum=None, address=__ADDRESS,
log_level=logging.ERROR):
def __init__(self, shunt_ohms: float,
max_expected_amps: Optional[float] = None,
busnum: Optional[int] = None, address: int = __ADDRESS,
log_level: int = logging.ERROR) -> None:
"""Construct the class.
Pass in the resistance of the shunt resistor and the maximum expected
Expand All @@ -117,11 +119,12 @@ def __init__(self, shunt_ohms, max_expected_amps=None,
self._shunt_ohms = shunt_ohms
self._max_expected_amps = max_expected_amps
self._min_device_current_lsb = self._calculate_min_current_lsb()
self._gain = None
self._gain: Optional[int] = None
self._auto_gain_enabled = False

def configure(self, voltage_range=RANGE_32V, gain=GAIN_AUTO,
bus_adc=ADC_12BIT, shunt_adc=ADC_12BIT):
def configure(self, voltage_range: int = RANGE_32V, gain: int = GAIN_AUTO,
bus_adc: int = ADC_12BIT,
shunt_adc: int = ADC_12BIT) -> None:
"""Configure and calibrate how the INA219 will take measurements.
Arguments:
Expand Down Expand Up @@ -175,87 +178,88 @@ def configure(self, voltage_range=RANGE_32V, gain=GAIN_AUTO,
self._max_expected_amps)
self._configure(voltage_range, self._gain, bus_adc, shunt_adc)

def voltage(self):
def voltage(self) -> float:
"""Return the bus voltage in volts."""
value = self._voltage_register()
return float(value) * self.__BUS_MILLIVOLTS_LSB / 1000

def supply_voltage(self):
def supply_voltage(self) -> float:
"""Return the bus supply voltage in volts.
This is the sum of the bus voltage and shunt voltage. A
DeviceRangeError exception is thrown if current overflow occurs.
"""
return self.voltage() + (float(self.shunt_voltage()) / 1000)

def current(self):
def current(self) -> float:
"""Return the bus current in milliamps.
A DeviceRangeError exception is thrown if current overflow occurs.
"""
self._handle_current_overflow()
return self._current_register() * self._current_lsb * 1000

def power(self):
def power(self) -> float:
"""Return the bus power consumption in milliwatts.
A DeviceRangeError exception is thrown if current overflow occurs.
"""
self._handle_current_overflow()
return self._power_register() * self._power_lsb * 1000

def shunt_voltage(self):
def shunt_voltage(self) -> float:
"""Return the shunt voltage in millivolts.
A DeviceRangeError exception is thrown if current overflow occurs.
"""
self._handle_current_overflow()
return self._shunt_voltage_register() * self.__SHUNT_MILLIVOLTS_LSB

def sleep(self):
def sleep(self) -> None:
"""Put the INA219 into power down mode."""
configuration = self._read_configuration()
self._configuration_register(configuration & 0xFFF8)

def wake(self):
def wake(self) -> None:
"""Wake the INA219 from power down mode."""
configuration = self._read_configuration()
self._configuration_register(configuration | 0x0007)
# 40us delay to recover from powerdown (p14 of spec)
time.sleep(0.00004)

def current_overflow(self):
def current_overflow(self) -> bool:
"""Return true if the sensor has detect current overflow.
In this case the current and power values are invalid.
"""
return self._has_current_overflow()

def reset(self):
def reset(self) -> None:
"""Reset the INA219 to its default configuration."""
self._configuration_register(1 << self.__RST)

def is_conversion_ready(self):
def is_conversion_ready(self) -> bool:
"""Check if conversion of a new reading has occured."""
cnvr = self._read_voltage_register() & self.__CNVR
return (cnvr == self.__CNVR)

def _handle_current_overflow(self):
def _handle_current_overflow(self) -> None:
if self._auto_gain_enabled:
while self._has_current_overflow():
self._increase_gain()
else:
if self._has_current_overflow():
raise DeviceRangeError(self.__GAIN_VOLTS[self._gain])
raise DeviceRangeError(
self.__GAIN_VOLTS[self._gain] if self._gain else 0.0)

def _determine_gain(self, max_expected_amps):
def _determine_gain(self, max_expected_amps: float) -> int:
shunt_v = max_expected_amps * self._shunt_ohms
if shunt_v > self.__GAIN_VOLTS[3]:
raise ValueError(self.__RNG_ERR_MSG % max_expected_amps)
gain = min(v for v in self.__GAIN_VOLTS if v > shunt_v)
return self.__GAIN_VOLTS.index(gain)

def _increase_gain(self):
def _increase_gain(self) -> None:
self.logger.info(self.__LOG_MSG_3)
gain = self._read_gain()
if gain < len(self.__GAIN_VOLTS) - 1:
Expand All @@ -270,15 +274,16 @@ def _increase_gain(self):
self.logger.info('Device limit reach, gain cannot be increased')
raise DeviceRangeError(self.__GAIN_VOLTS[gain], True)

def _configure(self, voltage_range, gain, bus_adc, shunt_adc):
def _configure(self, voltage_range: int, gain: int, bus_adc: int,
shunt_adc: int) -> None:
configuration = (
voltage_range << self.__BRNG | gain << self.__PG0 |
bus_adc << self.__BADC1 | shunt_adc << self.__SADC1 |
self.__CONT_SH_BUS)
self._configuration_register(configuration)

def _calibrate(self, bus_volts_max, shunt_volts_max,
max_expected_amps=None):
def _calibrate(self, bus_volts_max: int, shunt_volts_max: float,
max_expected_amps: Optional[float] = None) -> None:
self.logger.info(
self.__LOG_MSG_2 %
(bus_volts_max, shunt_volts_max,
Expand Down Expand Up @@ -309,7 +314,8 @@ def _calibrate(self, bus_volts_max, shunt_volts_max,
"calibration: 0x%04x (%d)" % (calibration, calibration))
self._calibration_register(calibration)

def _determine_current_lsb(self, max_expected_amps, max_possible_amps):
def _determine_current_lsb(self, max_expected_amps: Optional[float],
max_possible_amps: float) -> float:
if max_expected_amps is not None:
if max_expected_amps > round(max_possible_amps, 3):
raise ValueError(self.__AMP_ERR_MSG %
Expand All @@ -327,67 +333,68 @@ def _determine_current_lsb(self, max_expected_amps, max_possible_amps):
current_lsb = self._min_device_current_lsb
return current_lsb

def _configuration_register(self, register_value):
def _configuration_register(self, register_value: int) -> None:
self.logger.debug("configuration: 0x%04x" % register_value)
self.__write_register(self.__REG_CONFIG, register_value)

def _read_configuration(self):
def _read_configuration(self) -> int:
return self.__read_register(self.__REG_CONFIG)

def _calculate_min_current_lsb(self):
def _calculate_min_current_lsb(self) -> float:
return self.__CALIBRATION_FACTOR / \
(self._shunt_ohms * self.__MAX_CALIBRATION_VALUE)

def _read_gain(self):
def _read_gain(self) -> int:
configuration = self._read_configuration()
gain = (configuration & 0x1800) >> self.__PG0
self.logger.info("gain is currently: %.2fV" % self.__GAIN_VOLTS[gain])
return gain

def _configure_gain(self, gain):
def _configure_gain(self, gain: int) -> None:
configuration = self._read_configuration()
configuration = configuration & 0xE7FF
self._configuration_register(configuration | (gain << self.__PG0))
self._gain = gain
self.logger.info("gain set to: %.2fV" % self.__GAIN_VOLTS[gain])

def _calibration_register(self, register_value):
def _calibration_register(self, register_value: int) -> None:
self.logger.debug("calibration: 0x%04x" % register_value)
self.__write_register(self.__REG_CALIBRATION, register_value)

def _has_current_overflow(self):
def _has_current_overflow(self) -> bool:
ovf = self._read_voltage_register() & self.__OVF
return (ovf == 1)

def _voltage_register(self):
def _voltage_register(self) -> int:
register_value = self._read_voltage_register()
return register_value >> 3

def _read_voltage_register(self):
def _read_voltage_register(self) -> int:
return self.__read_register(self.__REG_BUSVOLTAGE)

def _current_register(self):
def _current_register(self) -> int:
return self.__read_register(self.__REG_CURRENT, True)

def _shunt_voltage_register(self):
def _shunt_voltage_register(self) -> int:
return self.__read_register(self.__REG_SHUNTVOLTAGE, True)

def _power_register(self):
def _power_register(self) -> int:
return self.__read_register(self.__REG_POWER)

def __validate_voltage_range(self, voltage_range):
def __validate_voltage_range(self, voltage_range: int) -> None:
if voltage_range > len(self.__BUS_RANGE) - 1:
raise ValueError(self.__VOLT_ERR_MSG)

def __write_register(self, register, register_value):
def __write_register(self, register: int, register_value: int) -> None:
register_bytes = self.__to_bytes(register_value)
self.logger.debug(
"write register 0x%02x: 0x%04x 0b%s" %
(register, register_value,
self.__binary_as_string(register_value)))
self._i2c.writeList(register, register_bytes)

def __read_register(self, register, negative_value_supported=False):
def __read_register(self, register: int,
negative_value_supported: bool = False) -> int:
if negative_value_supported:
register_value = self._i2c.readS16BE(register)
else:
Expand All @@ -396,15 +403,16 @@ def __read_register(self, register, negative_value_supported=False):
"read register 0x%02x: 0x%04x 0b%s" %
(register, register_value,
self.__binary_as_string(register_value)))
return register_value
return cast(int, register_value)

def __to_bytes(self, register_value):
def __to_bytes(self, register_value: int) -> List[int]:
return [(register_value >> 8) & 0xFF, register_value & 0xFF]

def __binary_as_string(self, register_value):
def __binary_as_string(self, register_value: int) -> str:
return bin(register_value)[2:].zfill(16)

def __max_expected_amps_to_string(self, max_expected_amps):
def __max_expected_amps_to_string(self, max_expected_amps:
Optional[float]) -> str:
if max_expected_amps is None:
return ''
else:
Expand All @@ -417,7 +425,7 @@ class DeviceRangeError(Exception):
__DEV_RNG_ERR = ('Current out of range (overflow), '
'for gain %.2fV')

def __init__(self, gain_volts, device_max=False):
def __init__(self, gain_volts: float, device_max: bool = False) -> None:
"""Construct a DeviceRangeError."""
msg = self.__DEV_RNG_ERR % gain_volts
if device_max:
Expand Down

0 comments on commit 8754a5f

Please sign in to comment.