Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ coverage.xml
.hypothesis/
.pytest_cache/
cover/
coverage/
tests/coverage_report/

# Translations
Expand Down
26 changes: 21 additions & 5 deletions docs/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,33 @@ done
pdm install
```

### perform unit tests:
### testing

The entire test suite (unit tests & compatibility tests) can be triggered
by running

```SHELL
pdm run pytest
scripts/run_tests
```

### perform compatibility tests:
You have successfully run all tests when the output ends with:

```SHELL
tox
```
----------------------------------------------------------------------
If you reached this far you passed. Congratulations!
----------------------------------------------------------------------
```

If the unit tests have run successfully coverage file in LCOV format is
generated and stored at `coverage/lcov.info`. This file can be used by
your IDE to indicate code coverage from within your editor window.
- We recommend the Visual Studio Code extension
[Code Coverage](https://marketplace.visualstudio.com/items?itemName=markis.code-coverage).
- Special care should be taken when modifying code that is not covered by unit tests!

Branches that do not pass the test harness (e.g. due to failing unit tests
or lowering the code coverage beneath the desired threshold) should not be
pull-requested.

### use the demo script

Expand Down
57 changes: 27 additions & 30 deletions feeph/i2c/burst_handler.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,18 @@ def read_register(self, register: int, byte_count: int = 1, max_tries: int = 5)
buf_r = bytearray(byte_count)
buf_r[0] = register
buf_w = bytearray(byte_count)
cur_try = 0
for cur_try in range(1, 1 + max_tries):
try:
self._i2c_bus.writeto_then_readfrom(address=self._i2c_adr, buffer_out=buf_r, buffer_in=buf_w)
return convert_bytearry_to_uint(buf_w)
except OSError as e:
# protect against sporadic errors on actual devices
# (maybe we can do something to prevent these errors?)
except (OSError, RuntimeError) as e:
# [Errno 121] Remote I/O error
LH.warning("[%s] Failed to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
time.sleep(0.001)
except RuntimeError as e:
LH.warning("[%s] Unable to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
time.sleep(0.001)
else:
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")

def write_register(self, register: int, value: int, byte_count: int = 1, max_tries: int = 3):
"""
Expand All @@ -91,23 +90,21 @@ def write_register(self, register: int, value: int, byte_count: int = 1, max_tri
"""
_validate_register_address(register)
ba = convert_uint_to_bytearry(value, byte_count)
buf = bytearray(1 + len(ba))
buf[0] = register
for i in range(len(ba)):
buf[1+i] = ba[i]
buf = bytearray([register])
for byte in ba:
buf.append(byte)
cur_try = 0
for cur_try in range(1, 1 + max_tries):
try:
self._i2c_bus.writeto(address=self._i2c_adr, buffer=buf)
return
except OSError as e:
# protect against sporadic errors on actual devices
# (maybe we can do something to prevent these errors?)
except (OSError, RuntimeError) as e:
# [Errno 121] Remote I/O error
LH.warning("[%s] Failed to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
LH.warning("[%s] Unable to write register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
time.sleep(0.1)
except RuntimeError as e:
LH.warning("[%s] Unable to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
time.sleep(0.1)
else:
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")

# it is unclear if it's possible to have a multi-byte state registers
# (a register write looks exactly like a multi-byte state write)
Expand All @@ -124,19 +121,18 @@ def get_state(self, byte_count: int = 1, max_tries: int = 5) -> int:
LH.warning("Multi byte reads are not implemented yet! Returning a single byte instead.")
byte_count = 1
buf = bytearray(byte_count)
cur_try = 0
for cur_try in range(1, 1 + max_tries):
try:
self._i2c_bus.readfrom_into(address=self._i2c_adr, buffer=buf)
return buf[0]
except OSError as e:
# protect against sporadic errors on actual devices
# (maybe we can do something to prevent these errors?)
except (OSError, RuntimeError) as e:
# [Errno 121] Remote I/O error
LH.warning("[%s] Failed to read state (%i/%i): %s", __name__, cur_try, max_tries, e)
time.sleep(0.001)
except RuntimeError as e:
LH.warning("[%s] Unable to read state (%i/%i): %s", __name__, cur_try, max_tries, e)
time.sleep(0.001)
else:
raise RuntimeError(f"Unable to read state after {cur_try} attempts. Giving up.")
raise RuntimeError(f"Unable to read state after {cur_try} attempts. Giving up.")

def set_state(self, value: int, byte_count: int = 1, max_tries: int = 3):
"""
Expand All @@ -149,19 +145,18 @@ def set_state(self, value: int, byte_count: int = 1, max_tries: int = 3):
LH.warning("Multi byte writes are not implemented yet! Returning a single byte instead.")
byte_count = 1
buf = convert_uint_to_bytearry(value, byte_count)
cur_try = 0
for cur_try in range(1, 1 + max_tries):
try:
self._i2c_bus.writeto(address=self._i2c_adr, buffer=buf)
return
except OSError as e:
# protect against sporadic errors on actual devices
# (maybe we can do something to prevent these errors?)
except (OSError, RuntimeError) as e:
# [Errno 121] Remote I/O error
LH.warning("[%s] Failed to write state (%i/%i): %s", __name__, cur_try, max_tries, e)
LH.warning("[%s] Unable to write state (%i/%i): %s", __name__, cur_try, max_tries, e)
time.sleep(0.1)
except RuntimeError as e:
LH.warning("[%s] Unable to write state 0x%02X (%i/%i): %s", __name__, cur_try, max_tries, e)
time.sleep(0.1)
else:
raise RuntimeError(f"Unable to write state after {cur_try} attempts. Giving up.")
raise RuntimeError(f"Unable to write state after {cur_try} attempts. Giving up.")


class BurstHandler:
Expand All @@ -182,6 +177,8 @@ def __init__(self, i2c_bus: busio.I2C, i2c_adr: int, timeout_ms: int | None = 50
self._timeout_ms = timeout_ms
else:
raise ValueError("Provided timeout is not a positive integer or 'None'!")
# register '_timestart_ns' - we will populate it later on
self._timestart_ns = 0

def __enter__(self) -> BurstHandle:
"""
Expand Down
39 changes: 33 additions & 6 deletions feeph/i2c/emulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ class EmulatedI2C(busio.I2C):
(e.g. duplicated registers with multiple addresses)
"""

# We intentionally do not call super().init() because we don't want to
# call any of the hardware initialization code. We are only interested
# in being able to claim we are an instance of 'busio.I2C' in case
# something else tries to validate that or has logic tied to it.
# pylint: disable=super-init-not-called
def __init__(self, state: dict[int, dict[int, int]], lock_chance: int = 100):
"""
initialize a simulated I2C bus
Expand All @@ -39,7 +44,7 @@ def __init__(self, state: dict[int, dict[int, int]], lock_chance: int = 100):

def try_lock(self) -> bool:
# may randomly fail to acquire a lock
return (random.randint(0, 100) < self._lock_chance)
return random.randint(0, 100) < self._lock_chance

def unlock(self):
pass
Expand All @@ -48,24 +53,41 @@ def unlock(self):
# being read/written since the register address must be positive in the
# range 0 ≤ x ≤ 255.

def readfrom_into(self, address, buffer, *, start=0, end=None, stop=True):
# replicate the signature of busio.I2C
# pylint: disable=too-many-arguments
def readfrom_into(self, address: int, buffer: bytearray, *, start=0, end=None, stop=True):
"""
read device state

(buffer is used as an output parameter)
"""
# make sure that buffer truly is a bytearray
# (we really want this to be a bytearray because bytearray performs
# its own input validation and automatically guarantees we're not
# getting negative byte values or other unexpected data as input)
if not isinstance(buffer, bytearray):
raise ValueError("buffer must be of type 'bytearray'")
i2c_device_address = address
i2c_device_register = -1
value = self._state[i2c_device_address][i2c_device_register]
ba = convert_uint_to_bytearry(value, len(buffer))
# copy computed result to output parameter
# pylint: disable=consider-using-enumerate
for i in range(len(buffer)):
buffer[i] = ba[i]

# replicate the signature of busio.I2C
# pylint: disable=too-many-arguments
def writeto(self, address: int, buffer: bytearray, *, start=0, end=None):
"""
write device state or register
"""
# make sure that buffer truly is a bytearray
# (we really want this to be a bytearray because bytearray performs
# its own input validation and automatically guarantees we're not
# getting negative byte values or other unexpected data as input)
if not isinstance(buffer, bytearray):
raise ValueError("buffer must be of type 'bytearray'")
if len(buffer) == 1:
# device status
i2c_device_address = address
Expand All @@ -75,8 +97,6 @@ def writeto(self, address: int, buffer: bytearray, *, start=0, end=None):
# device register
i2c_device_address = address
i2c_device_register = buffer[0]
if i2c_device_register < 0:
raise ValueError("device register can't be negative")
value = convert_bytearry_to_uint(buffer[1:])
self._state[i2c_device_address][i2c_device_register] = value

Expand All @@ -86,12 +106,19 @@ def writeto_then_readfrom(self, address: int, buffer_out: bytearray, buffer_in:

(buffer_in is used as an output parameter)
"""
# make sure that buffer_in and buffer_out truly are a bytearray
# (we really want this to be a bytearray because bytearray performs
# its own input validation and automatically guarantees we're not
# getting negative byte values or other unexpected data as input)
if not isinstance(buffer_in, bytearray):
raise ValueError("buffer_in must be of type 'bytearray'")
if not isinstance(buffer_out, bytearray):
raise ValueError("buffer_out must be of type 'bytearray'")
i2c_device_address = address
i2c_device_register = buffer_out[0]
if i2c_device_register < 0:
raise ValueError("device register can't be negative")
value = self._state[i2c_device_address][i2c_device_register]
ba = convert_uint_to_bytearry(value, len(buffer_in))
# copy computed result to output parameter
# pylint: disable=consider-using-enumerate
for i in range(len(buffer_in)):
buffer_in[i] = ba[i]
32 changes: 0 additions & 32 deletions feeph/i2c/utility.py

This file was deleted.

33 changes: 32 additions & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,14 @@ source-includes = ["tests/"]
dev = [
]
tools = [
"autopep8 ~= 2.2",
"copier ~= 9.3",
"flake8 ~= 7.0",
"mypy ~= 1.10",
"pylint ~= 3.2",
"pytest-cov ~= 5.0",
"pytest-sugar ~= 1.0",
"autopep8 ~= 2.2",
"copier ~= 9.3",
"coverage-lcov ~= 0.3",
"flake8 ~= 7.0",
"mypy ~= 1.10",
"pylint ~= 3.2",
"pytest-cov ~= 5.0",
"pytest-sugar ~= 1.0",
]

# during development: fetch all packages in our namespace from TestPyPI
Expand Down
6 changes: 6 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ astroid==3.2.4 \
autopep8==2.3.1 \
--hash=sha256:8d6c87eba648fdcfc83e29b788910b8643171c395d9c4bcf115ece035b9c9dda \
--hash=sha256:a203fe0fcad7939987422140ab17a930f684763bf7335bdb6709991dd7ef6c2d
click==8.1.7 \
--hash=sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28 \
--hash=sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de
colorama==0.4.6 \
--hash=sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44 \
--hash=sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6
Expand Down Expand Up @@ -49,6 +52,9 @@ coverage==7.6.1 \
--hash=sha256:f5796e664fe802da4f57a168c85359a8fbf3eab5e55cd4e4569fbacecc903959 \
--hash=sha256:fc5a77d0c516700ebad189b587de289a20a78324bc54baee03dd486f0855d234 \
--hash=sha256:fd21f6ae3f08b41004dfb433fa895d858f3f5979e7762d052b12aef444e29afc
coverage-lcov==0.3.0 \
--hash=sha256:0b352da0c72eacd555de2e10fa75ec165b8849ab6827218abc3ef6be2ec861f6 \
--hash=sha256:23ae34a535f1ed3fa4cdf9682f6c1fe1a40aa98c94aa05614512dbf2da82e749
coverage[toml]==7.6.1 \
--hash=sha256:07e2ca0ad381b91350c0ed49d52699b625aab2b44b65e1b4e02fa9df0e92ad2d \
--hash=sha256:0c0420b573964c760df9e9e86d1a9a622d0d27f417e1a949a8a66dd7bcee7bc6 \
Expand Down
Loading