Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions feeph/i2c/burst_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,16 @@ def read_register(self, register: int, byte_count: int = 1, max_tries: int = 5)
- may raise a RuntimeError if there were too many errors
"""
_validate_inputs(register=register, value=0, byte_count=byte_count, max_tries=max_tries)
if byte_count > 1:
LH.warning("Multi byte reads are not implemented yet! Returning a single byte instead.")
byte_count = 1
for cur_try in range(1, 1 + max_tries):
try:
buf_r = bytearray(1)
buf_r = bytearray(byte_count)
buf_r[0] = register
buf_w = bytearray(byte_count)
self._i2c_bus.writeto_then_readfrom(address=self._i2c_adr, buffer_out=buf_r, buffer_in=buf_w)
# TODO properly handle multi byte reads
return buf_w[0]
value = 0
for i in range(byte_count):
value += buf_w[i] << i*8
return value
except OSError as e:
# [Errno 121] Remote I/O error
LH.warning("[%s] Failed to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
Expand All @@ -91,15 +90,16 @@ def write_register(self, register: int, value: int, byte_count: int = 1, max_tri
- may raise a RuntimeError if there were too many errors
"""
_validate_inputs(register=register, value=value, byte_count=byte_count, max_tries=max_tries)
if byte_count > 1:
LH.warning("Multi byte writes are not implemented yet! Returning a single byte instead.")
byte_count = 1
for cur_try in range(1, 1 + max_tries):
try:
buf = bytearray(1 + byte_count)
buf = bytearray(1 + byte_count) # buf[0], buf[1], buf[2]
buf[0] = register
buf[1] = value & 0xFF
# TODO properly handle multi byte reads
# need to populate the bytes in reverse order:
# 0x##.. => buf[2]
# 0x..## => buf[1]
for i in range(byte_count, 0, -1):
buf[i] = value & 0xFF
value = value >> 8
self._i2c_bus.writeto(address=self._i2c_adr, buffer=buf)
return
except OSError as e:
Expand All @@ -112,6 +112,9 @@ def write_register(self, register: int, value: int, byte_count: int = 1, max_tri
else:
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")

# it is unclear if it's possible to have a multi-byte state registers
# (a register write looks exactly like a multi-byte state write)

def get_state(self, byte_count: int = 1, max_tries: int = 5) -> int:
"""
get current state of I²C device identified by `i2c_adr` and
Expand All @@ -127,7 +130,6 @@ def get_state(self, byte_count: int = 1, max_tries: int = 5) -> int:
for cur_try in range(1, 1 + max_tries):
try:
buf = bytearray(byte_count)
# TODO properly handle multi byte reads
self._i2c_bus.readfrom_into(address=self._i2c_adr, buffer=buf)
return buf[0]
except OSError as e:
Expand Down Expand Up @@ -155,7 +157,6 @@ def set_state(self, value: int, byte_count: int = 1, max_tries: int = 3):
try:
buf = bytearray(byte_count)
buf[0] = value & 0xFF
# TODO properly handle multi byte reads
self._i2c_bus.writeto(address=self._i2c_adr, buffer=buf)
return
except OSError as e:
Expand Down Expand Up @@ -228,7 +229,7 @@ def __exit__(self, exc_type, exc_value, exc_tb):


def _validate_inputs(register: int, value: int, byte_count: int = 1, max_tries: int = 3):
if register < 0 or register > 255:
if register < 0 or register > pow(255, byte_count):
raise ValueError(f"Provided I²C device register {register} is out of range! (allowed range: 0 ≤ x ≤ 255)")
max_value = pow(256, byte_count) - 1
if value < 0 or value > max_value:
Expand Down
20 changes: 17 additions & 3 deletions feeph/i2c/emulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ def readfrom_into(self, address, buffer, *, start=0, end=None, stop=True):
"""
i2c_device_address = address
i2c_device_register = -1
buffer[0] = self._state[i2c_device_address][i2c_device_register]
value = self._state[i2c_device_address][i2c_device_register]
for i in range(len(buffer)):
buffer[i] = value & 0xff
value = value >> 8

def writeto(self, address: int, buffer: bytearray, *, start=0, end=None):
"""
Expand All @@ -70,7 +73,15 @@ def writeto(self, address: int, buffer: bytearray, *, start=0, end=None):
i2c_device_register = buffer[0]
if i2c_device_register < 0:
raise ValueError("device register can't be negative")
value = buffer[1]
value = 0
# i must count up (0->len-1) to calculate the correct bitshift
# offset must count down (len->1) when populating the buffer
# 0x..## -> buf[2] -> i = 0, offset = 2
# 0x##.. -> buf[1] -> i = 1, offset = 1
# (buf[0] contains the register address)
for i in range(len(buffer) - 1):
offset = len(buffer) - 1 - i
value += buffer[offset] << i*8
self._state[i2c_device_address][i2c_device_register] = value

def writeto_then_readfrom(self, address: int, buffer_out: bytearray, buffer_in: bytearray, *, out_start=0, out_end=None, in_start=0, in_end=None, stop=False):
Expand All @@ -81,4 +92,7 @@ def writeto_then_readfrom(self, address: int, buffer_out: bytearray, buffer_in:
i2c_device_register = buffer_out[0]
if i2c_device_register < 0:
raise ValueError("device register can't be negative")
buffer_in[0] = self._state[i2c_device_address][i2c_device_register]
value = self._state[i2c_device_address][i2c_device_register]
for i in range(len(buffer_in)):
buffer_in[i] = value & 0xff
value = value >> 8
48 changes: 48 additions & 0 deletions tests/test_burst_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ def test_read_device_register(self):
# -----------------------------------------------------------------
self.assertEqual(computed, expected)

def test_read_device_register_multibyte(self):
state = {
0x4C: {
0x00: 0x1234,
},
}
i2c_bus = sut.EmulatedI2C(state=state)
# -----------------------------------------------------------------
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x4C) as bh:
computed = bh.read_register(0x00, byte_count=2)
expected = 0x1234
# -----------------------------------------------------------------
self.assertEqual(computed, expected)

def test_read_device_registers(self):
state = {
0x4C: {
Expand Down Expand Up @@ -75,6 +89,21 @@ def test_write_device_register(self):
# -----------------------------------------------------------------
self.assertEqual(computed, expected)

def test_write_device_register_multibyte(self):
state = {
0x4C: {
0x00: 0x0000,
},
}
i2c_bus = sut.EmulatedI2C(state=state)
# -----------------------------------------------------------------
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x4C) as bh:
bh.write_register(register=0x00, value=0x1234, byte_count=2)
computed = i2c_bus._state[0x4C]
expected = {0x00: 0x1234}
# -----------------------------------------------------------------
self.assertEqual(computed, expected)

def test_write_device_registers(self):
state = {
0x4C: {
Expand Down Expand Up @@ -146,6 +175,18 @@ def test_get_state(self):
# -----------------------------------------------------------------
self.assertEqual(computed, expected)

def test_get_state_multibyte(self):
state = {
0x70: {-1: 0x01},
}
i2c_bus = sut.EmulatedI2C(state=state)
# -----------------------------------------------------------------
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x70) as bh:
computed = bh.get_state(byte_count=2)
expected = 0x01 # byte count was ignored
# -----------------------------------------------------------------
self.assertEqual(computed, expected)

def test_set_state(self):
state = {
0x70: {-1: 0x00},
Expand All @@ -159,6 +200,13 @@ def test_set_state(self):
# -----------------------------------------------------------------
self.assertEqual(computed, expected)

def test_set_state_multibyte(self):
i2c_bus = sut.EmulatedI2C(state={0x70: {-1: 0x00}})
# -----------------------------------------------------------------
# -----------------------------------------------------------------
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x70) as bh:
self.assertRaises(ValueError, bh.set_state, value=0x0102)

# ---------------------------------------------------------------------

def test_no_timeout(self):
Expand Down