Skip to content

#96 rev i2c bus #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 204 additions & 16 deletions arduino_alvik/arduino_alvik.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __new__(cls):
return cls._instance

def __init__(self):
self.i2c = _ArduinoAlvikI2C(A4, A5)
self._packeter = ucPack(200)
self.left_wheel = _ArduinoAlvikWheel(self._packeter, ord('L'))
self.right_wheel = _ArduinoAlvikWheel(self._packeter, ord('R'))
Expand Down Expand Up @@ -100,54 +101,66 @@ def _progress_bar(percentage: float) -> None:
word = marks_str + f" {percentage}% \t"
sys.stdout.write(bytes((word.encode('utf-8'))))

def _idle(self, delay_=1, check_on_thread=False) -> None:
def _lenghty_op(self, iterations=10000000) -> int:
result = 0
for i in range(1, iterations):
result += i * i
return result

def _idle(self, delay_=1, check_on_thread=False, blocking=False) -> None:
"""
Alvik's idle mode behaviour
:return:
"""

NANO_CHK.value(1)
sleep_ms(500)
self.i2c.set_single_thread(True)

if blocking:
self._lenghty_op(50000)
else:
sleep_ms(500)
led_val = 0

try:
while not self.is_on():

if check_on_thread and not self.__class__._update_thread_running:
break
_ESP32_SDA = Pin(A4, Pin.OUT)
_ESP32_SCL = Pin(A5, Pin.OUT)
_ESP32_SCL.value(1)
_ESP32_SDA.value(1)
sleep_ms(100)
_ESP32_SCL.value(0)
_ESP32_SDA.value(0)

cmd = bytearray(1)
cmd[0] = 0x06
i2c = I2C(0, scl=ESP32_SCL, sda=ESP32_SDA)
i2c.writeto(0x36, cmd)
soc_raw = struct.unpack('h', i2c.readfrom(0x36, 2))[0]

self.i2c.start()
self.i2c.writeto(0x36, cmd)

soc_raw = struct.unpack('h', self.i2c.readfrom(0x36, 2))[0]
soc_perc = soc_raw * 0.00390625
self._progress_bar(round(soc_perc))
sleep_ms(delay_)
if blocking:
self._lenghty_op(10000)
else:
sleep_ms(delay_)
if soc_perc > 97:
LEDG.value(0)
LEDR.value(1)
else:
LEDR.value(led_val)
LEDG.value(1)
led_val = (led_val + 1) % 2
print("********** Alvik is on **********")
self.i2c.set_single_thread(False)
if self.is_on():
print("********** Alvik is on **********")
except KeyboardInterrupt:
self.stop()
sys.exit()
except Exception as e:
pass
# print(f'Unable to read SOC: {e}')
print(f'Unable to read SOC: {e}')
finally:
LEDR.value(1)
LEDG.value(1)
NANO_CHK.value(0)
self.i2c.set_single_thread(False)

@staticmethod
def _snake_robot(duration: int = 1000):
Expand Down Expand Up @@ -184,6 +197,7 @@ def begin(self) -> int:
if not self.is_on():
print("\n********** Please turn on your Arduino Alvik! **********\n")
sleep_ms(1000)
self.i2c.set_main_thread(_thread.get_ident())
self._idle(1000)
self._begin_update_thread()

Expand Down Expand Up @@ -579,6 +593,9 @@ def _update(self, delay_=1):
:param delay_: while loop delay (ms)
:return:
"""

self.i2c.set_main_thread(_thread.get_ident())

while True:
if not self.is_on():
print("Alvik is off")
Expand Down Expand Up @@ -1249,6 +1266,177 @@ def _stop_events_thread(cls):
cls._events_thread_running = False


class _ArduinoAlvikI2C:

_main_thread_id = None

def __init__(self, sda: int, scl: int):
"""
Alvik I2C wrapper
:param sda:
:param scl:
"""
self._lock = _thread.allocate_lock()

self._is_single_thread = False

self.sda = sda
self.scl = scl

def set_main_thread(self, thread_id: int):
"""
Sets the main thread of control. It will be the only thread allowed if set_single_thread is True
"""
with self._lock:
self.__class__._main_thread_id = thread_id

def set_single_thread(self, value):
"""
Sets the single thread mode on/off.
In single mode only the main thread is allowed to access the bus
"""
self._is_single_thread = value

def is_accessible(self):
"""
Returns True if bus is accessible by the current thread
"""
return not self._is_single_thread or _thread.get_ident() == self.__class__._main_thread_id

def start(self):
"""
Bitbanging start condition
:return:
"""
_SDA = Pin(self.sda, Pin.OUT)
_SDA.value(1)
sleep_ms(100)
_SDA.value(0)

def init(self, scl, sda, freq=400_000) -> None:
"""
init method just for call compatibility
"""
print("AlvikWarning:: init Unsupported. Alvik defines/initializes its own I2C bus")

def deinit(self):
"""
deinit method just for call compatibility
"""
print("AlvikWarning:: deinit Unsupported. Alvik defines/initializes its own I2C bus")

def stop(self):
""" Bitbanging stop condition (untested)
:return:
"""
_SDA = Pin(self.sda, Pin.OUT)
_SDA.value(0)
sleep_ms(100)
_SDA.value(1)

def scan(self) -> list[int]:
"""
I2C scan method
:return:
"""
if not self.is_accessible():
return []
with self._lock:
i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT))
return i2c.scan()

def readfrom(self, addr, nbytes, stop=True) -> bytes:
"""
Wrapping i2c readfrom
"""
if not self.is_accessible():
return bytes(nbytes)
with self._lock:
i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT))
return i2c.readfrom(addr, nbytes, stop)

def writeto(self, addr, buf, stop=True) -> int:
"""
Wrapping i2c writeto
"""
if not self.is_accessible():
return 0
with self._lock:
i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT))
return i2c.writeto(addr, buf, stop)

def readinto(self, buf, nack=True) -> None:
"""
Wrapping i2c readinto
"""
if not self.is_accessible():
return
with self._lock:
i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT))
return i2c.readinto(buf, nack)

def write(self, buf) -> int:
"""
Wrapping i2c write
"""
if not self.is_accessible():
return 0
with self._lock:
i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT))
return i2c.write(buf)

def readfrom_into(self, addr, buf, stop=True) -> None:
"""
Wrapping i2c readfrom_into
"""
if not self.is_accessible():
return
with self._lock:
i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT))
return i2c.readfrom_into(addr, buf, stop)

def writevto(self, addr, vector, stop=True) -> int:
"""
Wrapping i2c writevto
"""
if not self.is_accessible():
return 0
with self._lock:
i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT))
return i2c.writevto(addr, vector, stop)

def readfrom_mem(self, addr, memaddr, nbytes, addrsize=8) -> bytes:
"""
Wrapping i2c readfrom_mem
"""
if not self.is_accessible():
return bytes(nbytes)
with self._lock:
i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT))
return i2c.readfrom_mem(addr, memaddr, nbytes, addrsize=addrsize)

def readfrom_mem_into(self, addr, memaddr, buf, addrsize=8) -> None:
"""
Wrapping i2c readfrom_mem_into
"""
if not self.is_accessible():
return
with self._lock:
i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT))
return i2c.readfrom_mem_into(addr, memaddr, buf, addrsize=addrsize)

def writeto_mem(self, addr, memaddr, buf, addrsize=8) -> None:
"""
Wrapping i2c writeto_mem
"""
if not self.is_accessible():
return
with self._lock:
i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT))
return i2c.writeto_mem(addr, memaddr, buf, addrsize=addrsize)



class _ArduinoAlvikServo:

def __init__(self, packeter: ucPack, label: str, servo_id: int, position: list[int | None]):
Expand Down
4 changes: 2 additions & 2 deletions arduino_alvik/pinout_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
RESET_STM32 = Pin(D3, Pin.OUT) # nano D3 -> STM32 NRST
NANO_CHK = Pin(D4, Pin.OUT) # nano D4 -> STM32 NANO_CHK
CHECK_STM32 = Pin(A6, Pin.IN, Pin.PULL_DOWN) # nano A6/D23 -> STM32 ROBOT_CHK
ESP32_SDA = Pin(A4, Pin.OUT) # ESP32_SDA
ESP32_SCL = Pin(A5, Pin.OUT) # ESP32_SCL
# ESP32_SDA = Pin(A4, Pin.OUT) # ESP32_SDA
# ESP32_SCL = Pin(A5, Pin.OUT) # ESP32_SCL

# LEDS
LEDR = Pin(46, Pin.OUT) #RED ESP32 LEDR
Expand Down
25 changes: 25 additions & 0 deletions examples/i2c_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from time import sleep_ms
import sys

from arduino_alvik import ArduinoAlvik

alvik = ArduinoAlvik()
alvik.begin()

sleep_ms(1000)

while True:
try:
out = alvik.i2c.scan()

if len(out) == 0:
print("\nNo device found on I2C")
else:
print("\nList of devices")
for o in out:
print(hex(o))

sleep_ms(100)
except KeyboardInterrupt as e:
alvik.stop()
sys.exit()