diff --git a/arduino_alvik/arduino_alvik.py b/arduino_alvik/arduino_alvik.py index b2bd83e..ff5f1aa 100644 --- a/arduino_alvik/arduino_alvik.py +++ b/arduino_alvik/arduino_alvik.py @@ -26,6 +26,7 @@ def __new__(cls): return cls._instance def __init__(self): + self.i2c = _ArduinoAlvikI2C(A4, A5) self._packeter = ucPack(200) self.left_wheel = _ArduinoAlvikWheel(self._packeter, ord('L')) self.right_wheel = _ArduinoAlvikWheel(self._packeter, ord('R')) @@ -100,36 +101,45 @@ def _progress_bar(percentage: float) -> None: word = marks_str + f" {percentage}% \t" sys.stdout.write(bytes((word.encode('utf-8')))) - def _idle(self, delay_=1, check_on_thread=False) -> None: + def _lenghty_op(self, iterations=10000000) -> int: + result = 0 + for i in range(1, iterations): + result += i * i + return result + + def _idle(self, delay_=1, check_on_thread=False, blocking=False) -> None: """ Alvik's idle mode behaviour :return: """ - NANO_CHK.value(1) - sleep_ms(500) + self.i2c.set_single_thread(True) + + if blocking: + self._lenghty_op(50000) + else: + sleep_ms(500) led_val = 0 try: while not self.is_on(): + if check_on_thread and not self.__class__._update_thread_running: break - _ESP32_SDA = Pin(A4, Pin.OUT) - _ESP32_SCL = Pin(A5, Pin.OUT) - _ESP32_SCL.value(1) - _ESP32_SDA.value(1) - sleep_ms(100) - _ESP32_SCL.value(0) - _ESP32_SDA.value(0) cmd = bytearray(1) cmd[0] = 0x06 - i2c = I2C(0, scl=ESP32_SCL, sda=ESP32_SDA) - i2c.writeto(0x36, cmd) - soc_raw = struct.unpack('h', i2c.readfrom(0x36, 2))[0] + + self.i2c.start() + self.i2c.writeto(0x36, cmd) + + soc_raw = struct.unpack('h', self.i2c.readfrom(0x36, 2))[0] soc_perc = soc_raw * 0.00390625 self._progress_bar(round(soc_perc)) - sleep_ms(delay_) + if blocking: + self._lenghty_op(10000) + else: + sleep_ms(delay_) if soc_perc > 97: LEDG.value(0) LEDR.value(1) @@ -137,17 +147,20 @@ def _idle(self, delay_=1, check_on_thread=False) -> None: LEDR.value(led_val) LEDG.value(1) led_val = (led_val + 1) % 2 - print("********** Alvik is on **********") + self.i2c.set_single_thread(False) + if self.is_on(): + print("********** Alvik is on **********") except KeyboardInterrupt: self.stop() sys.exit() except Exception as e: pass - # print(f'Unable to read SOC: {e}') + print(f'Unable to read SOC: {e}') finally: LEDR.value(1) LEDG.value(1) NANO_CHK.value(0) + self.i2c.set_single_thread(False) @staticmethod def _snake_robot(duration: int = 1000): @@ -184,6 +197,7 @@ def begin(self) -> int: if not self.is_on(): print("\n********** Please turn on your Arduino Alvik! **********\n") sleep_ms(1000) + self.i2c.set_main_thread(_thread.get_ident()) self._idle(1000) self._begin_update_thread() @@ -579,6 +593,9 @@ def _update(self, delay_=1): :param delay_: while loop delay (ms) :return: """ + + self.i2c.set_main_thread(_thread.get_ident()) + while True: if not self.is_on(): print("Alvik is off") @@ -1249,6 +1266,177 @@ def _stop_events_thread(cls): cls._events_thread_running = False +class _ArduinoAlvikI2C: + + _main_thread_id = None + + def __init__(self, sda: int, scl: int): + """ + Alvik I2C wrapper + :param sda: + :param scl: + """ + self._lock = _thread.allocate_lock() + + self._is_single_thread = False + + self.sda = sda + self.scl = scl + + def set_main_thread(self, thread_id: int): + """ + Sets the main thread of control. It will be the only thread allowed if set_single_thread is True + """ + with self._lock: + self.__class__._main_thread_id = thread_id + + def set_single_thread(self, value): + """ + Sets the single thread mode on/off. + In single mode only the main thread is allowed to access the bus + """ + self._is_single_thread = value + + def is_accessible(self): + """ + Returns True if bus is accessible by the current thread + """ + return not self._is_single_thread or _thread.get_ident() == self.__class__._main_thread_id + + def start(self): + """ + Bitbanging start condition + :return: + """ + _SDA = Pin(self.sda, Pin.OUT) + _SDA.value(1) + sleep_ms(100) + _SDA.value(0) + + def init(self, scl, sda, freq=400_000) -> None: + """ + init method just for call compatibility + """ + print("AlvikWarning:: init Unsupported. Alvik defines/initializes its own I2C bus") + + def deinit(self): + """ + deinit method just for call compatibility + """ + print("AlvikWarning:: deinit Unsupported. Alvik defines/initializes its own I2C bus") + + def stop(self): + """ Bitbanging stop condition (untested) + :return: + """ + _SDA = Pin(self.sda, Pin.OUT) + _SDA.value(0) + sleep_ms(100) + _SDA.value(1) + + def scan(self) -> list[int]: + """ + I2C scan method + :return: + """ + if not self.is_accessible(): + return [] + with self._lock: + i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT)) + return i2c.scan() + + def readfrom(self, addr, nbytes, stop=True) -> bytes: + """ + Wrapping i2c readfrom + """ + if not self.is_accessible(): + return bytes(nbytes) + with self._lock: + i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT)) + return i2c.readfrom(addr, nbytes, stop) + + def writeto(self, addr, buf, stop=True) -> int: + """ + Wrapping i2c writeto + """ + if not self.is_accessible(): + return 0 + with self._lock: + i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT)) + return i2c.writeto(addr, buf, stop) + + def readinto(self, buf, nack=True) -> None: + """ + Wrapping i2c readinto + """ + if not self.is_accessible(): + return + with self._lock: + i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT)) + return i2c.readinto(buf, nack) + + def write(self, buf) -> int: + """ + Wrapping i2c write + """ + if not self.is_accessible(): + return 0 + with self._lock: + i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT)) + return i2c.write(buf) + + def readfrom_into(self, addr, buf, stop=True) -> None: + """ + Wrapping i2c readfrom_into + """ + if not self.is_accessible(): + return + with self._lock: + i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT)) + return i2c.readfrom_into(addr, buf, stop) + + def writevto(self, addr, vector, stop=True) -> int: + """ + Wrapping i2c writevto + """ + if not self.is_accessible(): + return 0 + with self._lock: + i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT)) + return i2c.writevto(addr, vector, stop) + + def readfrom_mem(self, addr, memaddr, nbytes, addrsize=8) -> bytes: + """ + Wrapping i2c readfrom_mem + """ + if not self.is_accessible(): + return bytes(nbytes) + with self._lock: + i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT)) + return i2c.readfrom_mem(addr, memaddr, nbytes, addrsize=addrsize) + + def readfrom_mem_into(self, addr, memaddr, buf, addrsize=8) -> None: + """ + Wrapping i2c readfrom_mem_into + """ + if not self.is_accessible(): + return + with self._lock: + i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT)) + return i2c.readfrom_mem_into(addr, memaddr, buf, addrsize=addrsize) + + def writeto_mem(self, addr, memaddr, buf, addrsize=8) -> None: + """ + Wrapping i2c writeto_mem + """ + if not self.is_accessible(): + return + with self._lock: + i2c = I2C(0, scl=Pin(self.scl, Pin.OUT), sda=Pin(self.sda, Pin.OUT)) + return i2c.writeto_mem(addr, memaddr, buf, addrsize=addrsize) + + + class _ArduinoAlvikServo: def __init__(self, packeter: ucPack, label: str, servo_id: int, position: list[int | None]): diff --git a/arduino_alvik/pinout_definitions.py b/arduino_alvik/pinout_definitions.py index 06d24a3..8892a38 100644 --- a/arduino_alvik/pinout_definitions.py +++ b/arduino_alvik/pinout_definitions.py @@ -13,8 +13,8 @@ RESET_STM32 = Pin(D3, Pin.OUT) # nano D3 -> STM32 NRST NANO_CHK = Pin(D4, Pin.OUT) # nano D4 -> STM32 NANO_CHK CHECK_STM32 = Pin(A6, Pin.IN, Pin.PULL_DOWN) # nano A6/D23 -> STM32 ROBOT_CHK -ESP32_SDA = Pin(A4, Pin.OUT) # ESP32_SDA -ESP32_SCL = Pin(A5, Pin.OUT) # ESP32_SCL +# ESP32_SDA = Pin(A4, Pin.OUT) # ESP32_SDA +# ESP32_SCL = Pin(A5, Pin.OUT) # ESP32_SCL # LEDS LEDR = Pin(46, Pin.OUT) #RED ESP32 LEDR diff --git a/examples/i2c_scan.py b/examples/i2c_scan.py new file mode 100644 index 0000000..a246787 --- /dev/null +++ b/examples/i2c_scan.py @@ -0,0 +1,25 @@ +from time import sleep_ms +import sys + +from arduino_alvik import ArduinoAlvik + +alvik = ArduinoAlvik() +alvik.begin() + +sleep_ms(1000) + +while True: + try: + out = alvik.i2c.scan() + + if len(out) == 0: + print("\nNo device found on I2C") + else: + print("\nList of devices") + for o in out: + print(hex(o)) + + sleep_ms(100) + except KeyboardInterrupt as e: + alvik.stop() + sys.exit() \ No newline at end of file