From 543a25a05b7d1c5940259bb3086b92e1165b9c34 Mon Sep 17 00:00:00 2001 From: ziirish Date: Fri, 1 Apr 2022 20:13:14 +0200 Subject: [PATCH 01/11] use tinytuya instead of pytuya for now this is just a drop in replacement, but since this new lib offers some discovery functionality as well as a dual mode (local or cloud) --- octoprint_tuyasmartplug/__init__.py | 224 ++++---- octoprint_tuyasmartplug/utils/__init__.py | 0 octoprint_tuyasmartplug/utils/pytuya.py | 629 ---------------------- setup.py | 47 +- 4 files changed, 139 insertions(+), 761 deletions(-) delete mode 100644 octoprint_tuyasmartplug/utils/__init__.py delete mode 100644 octoprint_tuyasmartplug/utils/pytuya.py diff --git a/octoprint_tuyasmartplug/__init__.py b/octoprint_tuyasmartplug/__init__.py index 659883d..5d980e6 100644 --- a/octoprint_tuyasmartplug/__init__.py +++ b/octoprint_tuyasmartplug/__init__.py @@ -10,8 +10,7 @@ import re import threading import time - -from octoprint_tuyasmartplug.utils import pytuya +import tinytuya class tuyasmartplugPlugin( @@ -167,15 +166,17 @@ def turn_on(self, pluglabel): int(plug["autoConnectDelay"]), self._printer.connect ) c.start() - if plug["sysCmdOn"]: - t = threading.Timer( - int(plug["sysCmdOnDelay"]), os.system, args=[plug["sysRunCmdOn"]] - ) - t.start() - else: - self._plugin_manager.send_plugin_message( - self._identifier, dict(currentState="unknown", label=pluglabel) - ) + if plug["sysCmdOn"]: + t = threading.Timer( + int(plug["sysCmdOnDelay"]), + os.system, + args=[plug["sysRunCmdOn"]], + ) + t.start() + else: + self._plugin_manager.send_plugin_message( + self._identifier, dict(currentState="unknown", label=pluglabel) + ) def turn_off(self, pluglabel): self._tuyasmartplug_logger.debug("Turning off %s." % pluglabel) @@ -199,9 +200,9 @@ def turn_off(self, pluglabel): int(plug["sysCmdOffDelay"]), os.system, args=[plug["sysRunCmdOff"]] ) t.start() - if plug["autoDisconnect"]: - self._printer.disconnect() - time.sleep(int(plug["autoDisconnectDelay"])) + if plug["autoDisconnect"]: + self._printer.disconnect() + time.sleep(int(plug["autoDisconnectDelay"])) if not plug["useCountdownRules"]: chk = self.sendCommand("off", plug["label"]) @@ -217,7 +218,10 @@ def check_status(self, pluglabel, resp=None): self._tuyasmartplug_logger.debug("Checking status of %s." % pluglabel) if pluglabel != "": response = resp or self.sendCommand("info", pluglabel) - if response is False: + if not isinstance(response, dict) or "Error" in response: + self._tuyasmartplug_logger.warning( + "Unable to check device status: %s" % response + ) self._plugin_manager.send_plugin_message( self._identifier, dict(currentState="unknown", label=pluglabel) ) @@ -264,14 +268,14 @@ def plug_search(self, lst, key, value): if item[key] == value: return item - def sendCommand(self, cmd, pluglabel, args=None, tries=1): + def sendCommand(self, cmd, pluglabel, args=None): self._tuyasmartplug_logger.debug("Sending command: %s to %s" % (cmd, pluglabel)) plug = self.plug_search( self._settings.get(["arrSmartplugs"]), "label", pluglabel ) - device = pytuya.OutletDevice(plug["id"], plug["ip"], plug["localKey"]) + device = tinytuya.OutletDevice(plug["id"], plug["ip"], plug["localKey"]) if plug.get("v33"): - device.version = 3.3 + device.set_version(3.3) commands = { "info": ("status", None), @@ -280,39 +284,21 @@ def sendCommand(self, cmd, pluglabel, args=None, tries=1): "countdown": ("set_timer", None), } - try: - command, arg = commands[cmd] - func = getattr(device, command, None) - if not func: - self._tuyasmartplug_logger.debug("No such command '%s'" % command) - return False - if args: - func(args) - elif arg is not None: - func(arg, plug["slot"]) - else: - func() + command, arg = commands[cmd] + func = getattr(device, command, None) + if not func: + self._tuyasmartplug_logger.debug("No such command '%s'" % command) + return False + if args: + func(args) + elif arg is not None: + func(arg, plug["slot"]) + else: + func() time.sleep(0.5) ret = device.status() self._tuyasmartplug_logger.debug("Status: %s" % str(ret)) return ret - except socket.error as e: - if e.errno == 104: - if tries <= 3: - self._tuyasmartplug_logger.debug( - "Connection refused... Trying again soon" - ) - time.sleep(1) - return self.sendCommand(cmd, pluglabel, args, tries + 1) - self._tuyasmartplug_logger.debug("Too many failed attempts") - return False - self._tuyasmartplug_logger.debug("Network error") - return False - except: - self._tuyasmartplug_logger.debug( - "Something went wrong while running the command" - ) - return False # ~~ Gcode processing hook @@ -338,69 +324,87 @@ def processGCODE(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwar plug = self.plug_search( self._settings.get(["arrSmartplugs"]), "label", name ) - self._tuyasmartplug_logger.debug(plug) - if plug["gcodeEnabled"]: - t = threading.Timer( - int(plug["gcodeOnDelay"]), self.turn_on, args=[plug["label"]] - ) - t.start() - return - elif cmd.startswith("M81"): - name = re.sub(r"^M81\s?", "", cmd) - self._tuyasmartplug_logger.debug( - "Received M81 command, attempting power off of %s." % name - ) - plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), "ip", name - ) - if not plug: - plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), "label", name - ) - self._tuyasmartplug_logger.debug(plug) - if plug["gcodeEnabled"]: - t = threading.Timer( - int(plug["gcodeOffDelay"]), self.gcode_turn_off, args=[plug] - ) - t.start() - return - else: - return - - elif cmd.startswith("@TUYAON"): - name = re.sub(r"^@TUYAON\s?", "", cmd) - self._tuyasmartplug_logger.debug( - "Received @TUYAON command, attempting power on of %s." % name - ) - plug = self.plug_search(self._settings.get(["arrSmartplugs"]), "ip", name) - if not plug: - plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), "label", name - ) - self._tuyasmartplug_logger.debug(plug) - if plug["gcodeEnabled"]: - t = threading.Timer( - int(plug["gcodeOnDelay"]), self.turn_on, args=[plug["label"]] - ) - t.start() - return None - elif cmd.startswith("@TUYAOFF"): - name = re.sub(r"^@TUYAOFF\s?", "", cmd) - self._tuyasmartplug_logger.debug( - "Received TUYAOFF command, attempting power off of %s." % name - ) - plug = self.plug_search(self._settings.get(["arrSmartplugs"]), "ip", name) - if not plug: - plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), "label", name - ) - self._tuyasmartplug_logger.debug(plug) - if plug["gcodeEnabled"]: - t = threading.Timer( - int(plug["gcodeOffDelay"]), self.gcode_turn_off, args=[plug] - ) - t.start() - return None + self._tuyasmartplug_logger.debug(plug) + if plug["gcodeEnabled"]: + t = threading.Timer( + int(plug["gcodeOnDelay"]), + self.turn_on, + args=[plug["label"]], + ) + t.start() + return + elif cmd.startswith("M81"): + name = re.sub(r"^M81\s?", "", cmd) + self._tuyasmartplug_logger.debug( + "Received M81 command, attempting power off of %s." % name + ) + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), "ip", name + ) + if not plug: + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), "label", name + ) + self._tuyasmartplug_logger.debug(plug) + if plug["gcodeEnabled"]: + t = threading.Timer( + int(plug["gcodeOffDelay"]), + self.gcode_turn_off, + args=[plug], + ) + t.start() + return + else: + return + + elif cmd.startswith("@TUYAON"): + name = re.sub(r"^@TUYAON\s?", "", cmd) + self._tuyasmartplug_logger.debug( + "Received @TUYAON command, attempting power on of %s." + % name + ) + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), "ip", name + ) + if not plug: + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), "label", name + ) + self._tuyasmartplug_logger.debug(plug) + if plug["gcodeEnabled"]: + t = threading.Timer( + int(plug["gcodeOnDelay"]), + self.turn_on, + args=[plug["label"]], + ) + t.start() + return None + elif cmd.startswith("@TUYAOFF"): + name = re.sub(r"^@TUYAOFF\s?", "", cmd) + self._tuyasmartplug_logger.debug( + "Received TUYAOFF command, attempting power off of %s." + % name + ) + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), + "ip", + name, + ) + if not plug: + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), + "label", + name, + ) + self._tuyasmartplug_logger.debug(plug) + if plug["gcodeEnabled"]: + t = threading.Timer( + int(plug["gcodeOffDelay"]), + self.gcode_turn_off, + args=[plug], + ) + t.start() + return None # ~~ Softwareupdate hook diff --git a/octoprint_tuyasmartplug/utils/__init__.py b/octoprint_tuyasmartplug/utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/octoprint_tuyasmartplug/utils/pytuya.py b/octoprint_tuyasmartplug/utils/pytuya.py deleted file mode 100644 index 665c570..0000000 --- a/octoprint_tuyasmartplug/utils/pytuya.py +++ /dev/null @@ -1,629 +0,0 @@ -# Python module to interface with Shenzhen Xenon ESP8266MOD WiFi smart devices -# E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U -# SKYROKU SM-PW701U Wi-Fi Plug Smart Plug -# Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging Works with Alexa -# -# This would not exist without the protocol reverse engineering from -# https://github.com/codetheweb/tuyapi by codetheweb and blackrozes -# -# Tested with Python 2.7 and Python 3.6.1 only - - -import base64 -from hashlib import md5 -import json -import logging -import socket -import sys -import time -import colorsys -import binascii - -try: - # raise ImportError - import Crypto - from Crypto.Cipher import AES # PyCrypto -except ImportError: - Crypto = AES = None - import pyaes # https://github.com/ricmoo/pyaes - -__version__ = "7.0.6" - - -log = logging.getLogger(__name__) -# logging.basicConfig() # TODO include function name/line numbers in log -# log.setLevel(level=logging.DEBUG) # Debug hack! - -log.info("%s version %s", __name__, __version__) -log.info("Python %s on %s", sys.version, sys.platform) -if Crypto is None: - log.info("Using pyaes version %r", pyaes.VERSION) - log.info("Using pyaes from %r", pyaes.__file__) -else: - log.info("Using PyCrypto %r", Crypto.version_info) - log.info("Using PyCrypto from %r", Crypto.__file__) - -SET = "set" -STATUS = "status" - -PROTOCOL_VERSION_BYTES_31 = b"3.1" -PROTOCOL_VERSION_BYTES_33 = b"3.3" - -IS_PY2 = sys.version_info[0] == 2 - - -class AESCipher(object): - def __init__(self, key): - # self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/ - self.bs = 16 - self.key = key - - def encrypt(self, raw, use_base64=True): - if Crypto: - raw = self._pad(raw) - cipher = AES.new(self.key, mode=AES.MODE_ECB) - crypted_text = cipher.encrypt(raw) - else: - _ = self._pad(raw) - cipher = pyaes.blockfeeder.Encrypter( - pyaes.AESModeOfOperationECB(self.key) - ) # no IV, auto pads to 16 - crypted_text = cipher.feed(raw) - crypted_text += cipher.feed() # flush final block - # print('crypted_text %r' % crypted_text) - # print('crypted_text (%d) %r' % (len(crypted_text), crypted_text)) - if use_base64: - return base64.b64encode(crypted_text) - else: - return crypted_text - - def decrypt(self, enc, use_base64=True): - if use_base64: - enc = base64.b64decode(enc) - # print('enc (%d) %r' % (len(enc), enc)) - # enc = self._unpad(enc) - # enc = self._pad(enc) - # print('upadenc (%d) %r' % (len(enc), enc)) - if Crypto: - cipher = AES.new(self.key, AES.MODE_ECB) - raw = cipher.decrypt(enc) - # print('raw (%d) %r' % (len(raw), raw)) - return self._unpad(raw).decode("utf-8") - # return self._unpad(cipher.decrypt(enc)).decode('utf-8') - else: - cipher = pyaes.blockfeeder.Decrypter( - pyaes.AESModeOfOperationECB(self.key) - ) # no IV, auto pads to 16 - plain_text = cipher.feed(enc) - plain_text += cipher.feed() # flush final block - return plain_text - - def _pad(self, s): - padnum = self.bs - len(s) % self.bs - return s + padnum * chr(padnum).encode() - - @staticmethod - def _unpad(s): - return s[: -ord(s[len(s) - 1 :])] - - -def bin2hex(x, pretty=False): - if pretty: - space = " " - else: - space = "" - if IS_PY2: - result = "".join("%02X%s" % (ord(y), space) for y in x) - else: - result = "".join("%02X%s" % (y, space) for y in x) - return result - - -def hex2bin(x): - if IS_PY2: - return x.decode("hex") - else: - return bytes.fromhex(x) - - -# This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi -payload_dict = { - "device": { - "status": {"hexByte": "0a", "command": {"gwId": "", "devId": ""}}, - "set": {"hexByte": "07", "command": {"devId": "", "uid": "", "t": ""}}, - "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) - "suffix": "000000000000aa55", - } -} - - -class XenonDevice(object): - def __init__( - self, dev_id, address, local_key=None, dev_type=None, connection_timeout=10 - ): - """ - Represents a Tuya device. - - Args: - dev_id (str): The device id. - address (str): The network address. - local_key (str, optional): The encryption key. Defaults to None. - dev_type (str, optional): The device type. - It will be used as key for lookups in payload_dict. - Defaults to None. - - Attributes: - port (int): The port to connect to. - """ - self.id = dev_id - self.address = address - self.local_key = local_key - self.local_key = local_key.encode("latin1") - self.dev_type = dev_type - self.connection_timeout = connection_timeout - self.version = 3.1 - - self.port = 6668 # default - do not expect caller to pass in - - def __repr__(self): - return "%r" % ((self.id, self.address),) # FIXME can do better than this - - def _send_receive(self, payload): - """ - Send single buffer `payload` and receive a single buffer. - - Args: - payload(bytes): Data to send. - """ - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - s.settimeout(self.connection_timeout) - s.connect((self.address, self.port)) - s.send(payload) - data = s.recv(1024) - s.close() - return data - - def set_version(self, version): - self.version = version - - def generate_payload(self, command, data=None): - """ - Generate the payload to send. - - Args: - command(str): The type of command. - This is one of the entries from payload_dict - data(dict, optional): The data to be send. - This is what will be passed via the 'dps' entry - """ - json_data = payload_dict[self.dev_type][command]["command"] - - if "gwId" in json_data: - json_data["gwId"] = self.id - if "devId" in json_data: - json_data["devId"] = self.id - if "uid" in json_data: - json_data["uid"] = self.id # still use id, no seperate uid - if "t" in json_data: - json_data["t"] = str(int(time.time())) - - if data is not None: - json_data["dps"] = data - - # Create byte buffer from hex data - json_payload = json.dumps(json_data) - # print(json_payload) - json_payload = json_payload.replace( - " ", "" - ) # if spaces are not removed device does not respond! - json_payload = json_payload.encode("utf-8") - log.debug("json_payload=%r", json_payload) - - if self.version == 3.3: - self.cipher = AESCipher( - self.local_key - ) # expect to connect and then disconnect to set new - json_payload = self.cipher.encrypt(json_payload, False) - self.cipher = None - if command != STATUS: - # add the 3.3 header - json_payload = ( - PROTOCOL_VERSION_BYTES_33 - + b"\0\0\0\0\0\0\0\0\0\0\0\0" - + json_payload - ) - elif command == SET: - # need to encrypt - # print('json_payload %r' % json_payload) - self.cipher = AESCipher( - self.local_key - ) # expect to connect and then disconnect to set new - json_payload = self.cipher.encrypt(json_payload) - # print('crypted json_payload %r' % json_payload) - preMd5String = ( - b"data=" - + json_payload - + b"||lpv=" - + PROTOCOL_VERSION_BYTES_31 - + b"||" - + self.local_key - ) - # print('preMd5String %r' % preMd5String) - m = md5() - m.update(preMd5String) - # print(repr(m.digest())) - hexdigest = m.hexdigest() - # print(hexdigest) - # print(hexdigest[8:][:16]) - json_payload = ( - PROTOCOL_VERSION_BYTES_31 - + hexdigest[8:][:16].encode("latin1") - + json_payload - ) - # print('data_to_send') - # print(json_payload) - # print('crypted json_payload (%d) %r' % (len(json_payload), json_payload)) - # print('json_payload %r' % repr(json_payload)) - # print('json_payload len %r' % len(json_payload)) - # print(bin2hex(json_payload)) - self.cipher = None # expect to connect and then disconnect to set new - - postfix_payload = hex2bin( - bin2hex(json_payload) + payload_dict[self.dev_type]["suffix"] - ) - # print('postfix_payload %r' % postfix_payload) - # print('postfix_payload %r' % len(postfix_payload)) - # print('postfix_payload %x' % len(postfix_payload)) - # print('postfix_payload %r' % hex(len(postfix_payload))) - assert len(postfix_payload) <= 0xFF - postfix_payload_hex_len = "%x" % len( - postfix_payload - ) # TODO this assumes a single byte 0-255 (0x00-0xff) - buffer = ( - hex2bin( - payload_dict[self.dev_type]["prefix"] - + payload_dict[self.dev_type][command]["hexByte"] - + "000000" - + postfix_payload_hex_len - ) - + postfix_payload - ) - - # calc the CRC of everything except where the CRC goes and the suffix - hex_crc = format(binascii.crc32(buffer[:-8]) & 0xFFFFFFFF, "08X") - buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:] - # print('command', command) - # print('prefix') - # print(payload_dict[self.dev_type][command]['prefix']) - # print(repr(buffer)) - # print(bin2hex(buffer, pretty=True)) - # print(bin2hex(buffer, pretty=False)) - # print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer))) - return buffer - - -class Device(XenonDevice): - def __init__(self, dev_id, address, local_key=None, dev_type=None): - super(Device, self).__init__(dev_id, address, local_key, dev_type) - - def status(self): - log.debug("status() entry") - # open device, send request, then close connection - payload = self.generate_payload("status") - - data = self._send_receive(payload) - log.debug("status received data=%r", data) - - result = data[20:-8] # hard coded offsets - log.debug("result=%r", result) - # result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer - # print('result %r' % result) - if result.startswith(b"{"): - # this is the regular expected code path - if not isinstance(result, str): - result = result.decode() - result = json.loads(result) - elif result.startswith(PROTOCOL_VERSION_BYTES_31): - # got an encrypted payload, happens occasionally - # expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} - # NOTE dps.2 may or may not be present - result = result[len(PROTOCOL_VERSION_BYTES_31) :] # remove version header - result = result[ - 16: - ] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload - cipher = AESCipher(self.local_key) - result = cipher.decrypt(result) - log.debug("decrypted result=%r", result) - if not isinstance(result, str): - result = result.decode() - result = json.loads(result) - elif self.version == 3.3: - cipher = AESCipher(self.local_key) - result = cipher.decrypt(result, False) - log.debug("decrypted result=%r", result) - if not isinstance(result, str): - result = result.decode() - result = json.loads(result) - else: - log.error("Unexpected status() payload=%r", result) - - return result - - def set_status(self, on, switch=1): - """ - Set status of the device to 'on' or 'off'. - - Args: - on(bool): True for 'on', False for 'off'. - switch(int): The switch to set - """ - # open device, send request, then close connection - if isinstance(switch, int): - switch = str(switch) # index and payload is a string - payload = self.generate_payload(SET, {switch: on}) - # print('payload %r' % payload) - - data = self._send_receive(payload) - log.debug("set_status received data=%r", data) - - return data - - def set_value(self, index, value): - """ - Set int value of any index. - - Args: - index(int): index to set - value(int): new value for the index - """ - # open device, send request, then close connection - if isinstance(index, int): - index = str(index) # index and payload is a string - - payload = self.generate_payload(SET, {index: value}) - - data = self._send_receive(payload) - - return data - - def turn_on(self, switch=1): - """Turn the device on""" - self.set_status(True, switch) - - def turn_off(self, switch=1): - """Turn the device off""" - self.set_status(False, switch) - - def set_timer(self, num_secs): - """ - Set a timer. - - Args: - num_secs(int): Number of seconds - """ - # FIXME / TODO support schemas? Accept timer id number as parameter? - - # Dumb heuristic; Query status, pick last device id as that is probably the timer - status = self.status() - devices = status["dps"] - devices_numbers = list(devices.keys()) - devices_numbers.sort() - dps_id = devices_numbers[-1] - - payload = self.generate_payload(SET, {dps_id: num_secs}) - - data = self._send_receive(payload) - log.debug("set_timer received data=%r", data) - return data - - -class OutletDevice(Device): - def __init__(self, dev_id, address, local_key=None): - dev_type = "device" - super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type) - - -class BulbDevice(Device): - DPS_INDEX_ON = "1" - DPS_INDEX_MODE = "2" - DPS_INDEX_BRIGHTNESS = "3" - DPS_INDEX_COLOURTEMP = "4" - DPS_INDEX_COLOUR = "5" - - DPS = "dps" - DPS_MODE_COLOUR = "colour" - DPS_MODE_WHITE = "white" - - DPS_2_STATE = { - "1": "is_on", - "2": "mode", - "3": "brightness", - "4": "colourtemp", - "5": "colour", - } - - def __init__(self, dev_id, address, local_key=None): - dev_type = "device" - super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type) - - @staticmethod - def _rgb_to_hexvalue(r, g, b): - """ - Convert an RGB value to the hex representation expected by tuya. - - Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format: - rrggbb0hhhssvv - - While r, g and b are just hexadecimal values of the corresponding - Red, Green and Blue values, the h, s and v values (which are values - between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively. - - Args: - r(int): Value for the colour red as int from 0-255. - g(int): Value for the colour green as int from 0-255. - b(int): Value for the colour blue as int from 0-255. - """ - rgb = [r, g, b] - hsv = colorsys.rgb_to_hsv(rgb[0] / 255, rgb[1] / 255, rgb[2] / 255) - - hexvalue = "" - for value in rgb: - temp = str(hex(int(value))).replace("0x", "") - if len(temp) == 1: - temp = "0" + temp - hexvalue = hexvalue + temp - - hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)] - hexvalue_hsv = "" - for value in hsvarray: - temp = str(hex(int(value))).replace("0x", "") - if len(temp) == 1: - temp = "0" + temp - hexvalue_hsv = hexvalue_hsv + temp - if len(hexvalue_hsv) == 7: - hexvalue = hexvalue + "0" + hexvalue_hsv - else: - hexvalue = hexvalue + "00" + hexvalue_hsv - - return hexvalue - - @staticmethod - def _hexvalue_to_rgb(hexvalue): - """ - Converts the hexvalue used by tuya for colour representation into - an RGB value. - - Args: - hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() - """ - r = int(hexvalue[0:2], 16) - g = int(hexvalue[2:4], 16) - b = int(hexvalue[4:6], 16) - - return (r, g, b) - - @staticmethod - def _hexvalue_to_hsv(hexvalue): - """ - Converts the hexvalue used by tuya for colour representation into - an HSV value. - - Args: - hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() - """ - h = int(hexvalue[7:10], 16) / 360 - s = int(hexvalue[10:12], 16) / 255 - v = int(hexvalue[12:14], 16) / 255 - - return (h, s, v) - - def set_colour(self, r, g, b): - """ - Set colour of an rgb bulb. - - Args: - r(int): Value for the colour red as int from 0-255. - g(int): Value for the colour green as int from 0-255. - b(int): Value for the colour blue as int from 0-255. - """ - if not 0 <= r <= 255: - raise ValueError("The value for red needs to be between 0 and 255.") - if not 0 <= g <= 255: - raise ValueError("The value for green needs to be between 0 and 255.") - if not 0 <= b <= 255: - raise ValueError("The value for blue needs to be between 0 and 255.") - - # print(BulbDevice) - hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b) - - payload = self.generate_payload( - SET, - { - self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR, - self.DPS_INDEX_COLOUR: hexvalue, - }, - ) - data = self._send_receive(payload) - return data - - def set_white(self, brightness, colourtemp): - """ - Set white coloured theme of an rgb bulb. - - Args: - brightness(int): Value for the brightness (25-255). - colourtemp(int): Value for the colour temperature (0-255). - """ - if not 25 <= brightness <= 255: - raise ValueError("The brightness needs to be between 25 and 255.") - if not 0 <= colourtemp <= 255: - raise ValueError("The colour temperature needs to be between 0 and 255.") - - payload = self.generate_payload( - SET, - { - self.DPS_INDEX_MODE: self.DPS_MODE_WHITE, - self.DPS_INDEX_BRIGHTNESS: brightness, - self.DPS_INDEX_COLOURTEMP: colourtemp, - }, - ) - - data = self._send_receive(payload) - return data - - def set_brightness(self, brightness): - """ - Set the brightness value of an rgb bulb. - - Args: - brightness(int): Value for the brightness (25-255). - """ - if not 25 <= brightness <= 255: - raise ValueError("The brightness needs to be between 25 and 255.") - - payload = self.generate_payload(SET, {self.DPS_INDEX_BRIGHTNESS: brightness}) - data = self._send_receive(payload) - return data - - def set_colourtemp(self, colourtemp): - """ - Set the colour temperature of an rgb bulb. - - Args: - colourtemp(int): Value for the colour temperature (0-255). - """ - if not 0 <= colourtemp <= 255: - raise ValueError("The colour temperature needs to be between 0 and 255.") - - payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colourtemp}) - data = self._send_receive(payload) - return data - - def brightness(self): - """Return brightness value""" - return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS] - - def colourtemp(self): - """Return colour temperature""" - return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP] - - def colour_rgb(self): - """Return colour as RGB value""" - hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] - return BulbDevice._hexvalue_to_rgb(hexvalue) - - def colour_hsv(self): - """Return colour as HSV value""" - hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] - return BulbDevice._hexvalue_to_hsv(hexvalue) - - def state(self): - status = self.status() - state = {} - - for key in status[self.DPS].keys(): - if int(key) <= 5: - state[self.DPS_2_STATE[key]] = status[self.DPS][key] - - return state diff --git a/setup.py b/setup.py index eeee39c..aae100a 100644 --- a/setup.py +++ b/setup.py @@ -34,8 +34,7 @@ # Any additional requirements besides OctoPrint should be listed here plugin_requires = [ - 'pyaes==1.6.1', - 'pycrypto==2.6.1' + "tinytuya", ] ### -------------------------------------------------------------------------------------------------------------------- @@ -69,31 +68,35 @@ from setuptools import setup try: - import octoprint_setuptools + import octoprint_setuptools except: - print("Could not import OctoPrint's setuptools, are you sure you are running that under " - "the same python installation that OctoPrint is installed under?") - import sys - sys.exit(-1) + print( + "Could not import OctoPrint's setuptools, are you sure you are running that under " + "the same python installation that OctoPrint is installed under?" + ) + import sys + + sys.exit(-1) setup_parameters = octoprint_setuptools.create_plugin_setup_parameters( - identifier=plugin_identifier, - package=plugin_package, - name=plugin_name, - version=plugin_version, - description=plugin_description, - author=plugin_author, - mail=plugin_author_email, - url=plugin_url, - license=plugin_license, - requires=plugin_requires, - additional_packages=plugin_additional_packages, - ignored_packages=plugin_ignored_packages, - additional_data=plugin_additional_data + identifier=plugin_identifier, + package=plugin_package, + name=plugin_name, + version=plugin_version, + description=plugin_description, + author=plugin_author, + mail=plugin_author_email, + url=plugin_url, + license=plugin_license, + requires=plugin_requires, + additional_packages=plugin_additional_packages, + ignored_packages=plugin_ignored_packages, + additional_data=plugin_additional_data, ) if len(additional_setup_parameters): - from octoprint.util import dict_merge - setup_parameters = dict_merge(setup_parameters, additional_setup_parameters) + from octoprint.util import dict_merge + + setup_parameters = dict_merge(setup_parameters, additional_setup_parameters) setup(**setup_parameters) From 0471cb1ecb4befc74add354ed61ea0356c15ed1a Mon Sep 17 00:00:00 2001 From: Carlsans <46719148+Carlsans@users.noreply.github.com> Date: Fri, 9 Feb 2024 11:00:28 -0500 Subject: [PATCH 02/11] Update __init__.py Changed version 3.3 by 3.4 --- octoprint_tuyasmartplug/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/octoprint_tuyasmartplug/__init__.py b/octoprint_tuyasmartplug/__init__.py index 5d980e6..e733897 100644 --- a/octoprint_tuyasmartplug/__init__.py +++ b/octoprint_tuyasmartplug/__init__.py @@ -275,7 +275,7 @@ def sendCommand(self, cmd, pluglabel, args=None): ) device = tinytuya.OutletDevice(plug["id"], plug["ip"], plug["localKey"]) if plug.get("v33"): - device.set_version(3.3) + device.set_version(3.4) commands = { "info": ("status", None), From 1bb6626f87ad50c6bc3efbb758a88b3bb67133be Mon Sep 17 00:00:00 2001 From: Carlsans <46719148+Carlsans@users.noreply.github.com> Date: Thu, 15 Feb 2024 15:43:42 -0500 Subject: [PATCH 03/11] Now working with multiple plug versions --- octoprint_tuyasmartplug/__init__.py | 124 +++++++++--------- .../templates/tuyasmartplug_settings.jinja2 | 15 ++- 2 files changed, 75 insertions(+), 64 deletions(-) diff --git a/octoprint_tuyasmartplug/__init__.py b/octoprint_tuyasmartplug/__init__.py index e733897..6571f3b 100644 --- a/octoprint_tuyasmartplug/__init__.py +++ b/octoprint_tuyasmartplug/__init__.py @@ -69,7 +69,7 @@ def get_settings_defaults(self): "displayWarning": True, "warnPrinting": False, "gcodeEnabled": False, - "v33": False, + "plugversion": "1.4", "gcodeOnDelay": 0, "gcodeOffDelay": 0, "autoConnect": True, @@ -274,8 +274,9 @@ def sendCommand(self, cmd, pluglabel, args=None): self._settings.get(["arrSmartplugs"]), "label", pluglabel ) device = tinytuya.OutletDevice(plug["id"], plug["ip"], plug["localKey"]) - if plug.get("v33"): - device.set_version(3.4) + #if plug.get("v33"): + self._tuyasmartplug_logger.debug("Plug version "+str(plug["plugversion"])) + device.set_version(float(plug["plugversion"])) commands = { "info": ("status", None), @@ -312,6 +313,7 @@ def gcode_turn_off(self, plug): def processGCODE(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwargs): if gcode: + self._tuyasmartplug_logger.debug(str(cmd)) if cmd.startswith("M80"): name = re.sub(r"^M80\s?", "", cmd) self._tuyasmartplug_logger.debug( @@ -333,78 +335,80 @@ def processGCODE(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwar ) t.start() return - elif cmd.startswith("M81"): - name = re.sub(r"^M81\s?", "", cmd) + elif cmd.startswith("M81"): + name = re.sub(r"^M81\s?", "", cmd) + self._tuyasmartplug_logger.debug( + "Received M81 command, attempting power off of %s." % name + ) + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), "ip", name + ) + if not plug: + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), "label", name + ) + self._tuyasmartplug_logger.debug(plug) + if plug["gcodeEnabled"]: + #self.gcode_turn_off(plug) + t = threading.Timer( + int(plug["gcodeOffDelay"]), + self.gcode_turn_off, + args=[plug], + ) + t.start() + return + else: + return + + elif cmd.startswith("@TUYAON"): + name = re.sub(r"^@TUYAON\s?", "", cmd) + self._tuyasmartplug_logger.debug( + "Received @TUYAON command, attempting power on of %s." + % name + ) + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), "ip", name + ) + if not plug: + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), "label", name + ) + self._tuyasmartplug_logger.debug(plug) + if plug["gcodeEnabled"]: + t = threading.Timer( + int(plug["gcodeOnDelay"]), + self.turn_on, + args=[plug["label"]], + ) + t.start() + return None + elif cmd.startswith("@TUYAOFF"): + name = re.sub(r"^@TUYAOFF\s?", "", cmd) self._tuyasmartplug_logger.debug( - "Received M81 command, attempting power off of %s." % name + "Received TUYAOFF command, attempting power off of %s." + % name ) plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), "ip", name + self._settings.get(["arrSmartplugs"]), + "ip", + name, ) if not plug: plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), "label", name + self._settings.get(["arrSmartplugs"]), + "label", + name, ) self._tuyasmartplug_logger.debug(plug) if plug["gcodeEnabled"]: + #self.gcode_turn_off(plug) t = threading.Timer( int(plug["gcodeOffDelay"]), self.gcode_turn_off, args=[plug], ) t.start() - return - else: - return - - elif cmd.startswith("@TUYAON"): - name = re.sub(r"^@TUYAON\s?", "", cmd) - self._tuyasmartplug_logger.debug( - "Received @TUYAON command, attempting power on of %s." - % name - ) - plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), "ip", name - ) - if not plug: - plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), "label", name - ) - self._tuyasmartplug_logger.debug(plug) - if plug["gcodeEnabled"]: - t = threading.Timer( - int(plug["gcodeOnDelay"]), - self.turn_on, - args=[plug["label"]], - ) - t.start() - return None - elif cmd.startswith("@TUYAOFF"): - name = re.sub(r"^@TUYAOFF\s?", "", cmd) - self._tuyasmartplug_logger.debug( - "Received TUYAOFF command, attempting power off of %s." - % name - ) - plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), - "ip", - name, - ) - if not plug: - plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), - "label", - name, - ) - self._tuyasmartplug_logger.debug(plug) - if plug["gcodeEnabled"]: - t = threading.Timer( - int(plug["gcodeOffDelay"]), - self.gcode_turn_off, - args=[plug], - ) - t.start() - return None + return None # ~~ Softwareupdate hook diff --git a/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 b/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 index acbf0d5..7bbddee 100644 --- a/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 +++ b/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 @@ -11,7 +11,6 @@ - @@ -70,7 +69,15 @@
-
+
+ +
@@ -92,8 +99,8 @@ -
-
+
+
From 8e05d3a4678def6a08778c70ec6f8e8fe73187ed Mon Sep 17 00:00:00 2001 From: Carlsans <46719148+Carlsans@users.noreply.github.com> Date: Sat, 17 Feb 2024 13:48:17 -0500 Subject: [PATCH 04/11] Update setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index aae100a..6f49f4e 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ plugin_name = "OctoPrint-TuyaSmartplug" # The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module -plugin_version = "0.3.0" +plugin_version = "0.4.0" # The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin # module From 6ae67966736ff865b46a4816f07bc06a16475be7 Mon Sep 17 00:00:00 2001 From: Carlsans <46719148+Carlsans@users.noreply.github.com> Date: Sun, 18 Feb 2024 23:09:17 -0500 Subject: [PATCH 05/11] Update __init__.py Updated version number (will load default values) --- octoprint_tuyasmartplug/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/octoprint_tuyasmartplug/__init__.py b/octoprint_tuyasmartplug/__init__.py index 6571f3b..240c70f 100644 --- a/octoprint_tuyasmartplug/__init__.py +++ b/octoprint_tuyasmartplug/__init__.py @@ -115,7 +115,7 @@ def on_settings_save(self, data): self._tuyasmartplug_logger.setLevel(logging.INFO) def get_settings_version(self): - return 3 + return 4 def on_settings_migrate(self, target, current=None): if current is None or current < self.get_settings_version(): From fd1fbf9d443377b514f2e0bcacf521246813d32f Mon Sep 17 00:00:00 2001 From: Carlsans <46719148+Carlsans@users.noreply.github.com> Date: Mon, 19 Feb 2024 00:59:27 -0500 Subject: [PATCH 06/11] Corrected many bugs, changed version number Will force to erase old data from your previous installation --- octoprint_tuyasmartplug/__init__.py | 76 +++++++++++++++++------------ 1 file changed, 46 insertions(+), 30 deletions(-) diff --git a/octoprint_tuyasmartplug/__init__.py b/octoprint_tuyasmartplug/__init__.py index 240c70f..fe4ec1b 100644 --- a/octoprint_tuyasmartplug/__init__.py +++ b/octoprint_tuyasmartplug/__init__.py @@ -11,6 +11,9 @@ import threading import time import tinytuya +import tinytuya.scanner +from io import StringIO +import sys class tuyasmartplugPlugin( @@ -152,13 +155,10 @@ def turn_on(self, pluglabel): self._settings.get(["arrSmartplugs"]), "label", pluglabel ) self._tuyasmartplug_logger.debug(plug) - if plug["useCountdownRules"]: - chk = self.sendCommand( - "countdown", plug["label"], int(plug["countdownOnDelay"]) - ) - else: - chk = self.sendCommand("on", plug["label"]) - + if plug["useCountdownRules"] : + time.sleep(int(plug["countdownOnDelay"])) + chk = self.sendCommand("on", plug["label"]) + self._tuyasmartplug_logger.debug("chk:"+str(chk)) if chk is not False: self.check_status(plug["label"], chk) if plug["autoConnect"]: @@ -173,10 +173,10 @@ def turn_on(self, pluglabel): args=[plug["sysRunCmdOn"]], ) t.start() - else: - self._plugin_manager.send_plugin_message( - self._identifier, dict(currentState="unknown", label=pluglabel) - ) + else: + self._plugin_manager.send_plugin_message( + self._identifier, dict(currentState="unknown", label=pluglabel) + ) def turn_off(self, pluglabel): self._tuyasmartplug_logger.debug("Turning off %s." % pluglabel) @@ -191,9 +191,7 @@ def turn_off(self, pluglabel): ) self._tuyasmartplug_logger.debug(plug) if plug["useCountdownRules"]: - chk = self.sendCommand( - "countdown", plug["label"], int(plug["countdownOffDelay"]) - ) + time.sleep(int(plug["countdownOffDelay"])) if plug["sysCmdOff"]: t = threading.Timer( @@ -204,8 +202,8 @@ def turn_off(self, pluglabel): self._printer.disconnect() time.sleep(int(plug["autoDisconnectDelay"])) - if not plug["useCountdownRules"]: - chk = self.sendCommand("off", plug["label"]) + + chk = self.sendCommand("off", plug["label"]) if chk is not False: self.check_status(plug["label"], chk) @@ -285,21 +283,29 @@ def sendCommand(self, cmd, pluglabel, args=None): "countdown": ("set_timer", None), } - command, arg = commands[cmd] - func = getattr(device, command, None) - if not func: - self._tuyasmartplug_logger.debug("No such command '%s'" % command) - return False - if args: - func(args) - elif arg is not None: - func(arg, plug["slot"]) - else: - func() - time.sleep(0.5) + if cmd == "on": + ret = device.turn_on(int(plug["slot"])) + if cmd == "off": + ret = device.turn_off(int(plug["slot"])) + if cmd == "info": ret = device.status() - self._tuyasmartplug_logger.debug("Status: %s" % str(ret)) - return ret + self._tuyasmartplug_logger.debug("Status: %s" % str(ret)) + return ret + # command, arg = commands[cmd] + # func = getattr(device, command, None) + # if not func: + # self._tuyasmartplug_logger.debug("No such command '%s'" % command) + # return False + # if args: + # func(args) + # elif arg is not None: + # func(arg, plug["slot"]) + # else: + # func() + # time.sleep(0.5) + # ret = device.status() + # self._tuyasmartplug_logger.debug("Status: %s" % str(ret)) + # return ret # ~~ Gcode processing hook @@ -314,6 +320,16 @@ def gcode_turn_off(self, plug): def processGCODE(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwargs): if gcode: self._tuyasmartplug_logger.debug(str(cmd)) + if cmd.startswith("M123"): + tmp = sys.stdout + my_result = StringIO() + sys.stdout = my_result + tinytuya.scanner.scan() + sys.stdout = tmp + print(my_result.getvalue()) + self._tuyasmartplug_logger.debug(my_result.getvalue()) + return my_result.getvalue() + if cmd.startswith("M80"): name = re.sub(r"^M80\s?", "", cmd) self._tuyasmartplug_logger.debug( From 950975c6b4db28af52e209b86b7ec1af8ad869cf Mon Sep 17 00:00:00 2001 From: Carlsans <46719148+Carlsans@users.noreply.github.com> Date: Mon, 19 Feb 2024 01:03:51 -0500 Subject: [PATCH 07/11] Update version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 6f49f4e..07d1d33 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ plugin_name = "OctoPrint-TuyaSmartplug" # The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module -plugin_version = "0.4.0" +plugin_version = "0.4.1" # The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin # module From 528412cc72452580f5da70dc122200c3918ea445 Mon Sep 17 00:00:00 2001 From: Carlsans <46719148+Carlsans@users.noreply.github.com> Date: Tue, 27 Feb 2024 12:39:42 -0500 Subject: [PATCH 08/11] Added TinyTuya automatic discovery You only now need to insert you tuya api key and secret in settings. After, do M1234 in gcode terminal. It may take a while. Refresh your browser window and go back to your settings. Your plugs should now be there. --- octoprint_tuyasmartplug/__init__.py | 134 ++++++++++++++---- .../templates/tuyasmartplug_settings.jinja2 | 26 +++- 2 files changed, 134 insertions(+), 26 deletions(-) diff --git a/octoprint_tuyasmartplug/__init__.py b/octoprint_tuyasmartplug/__init__.py index fe4ec1b..d66fc97 100644 --- a/octoprint_tuyasmartplug/__init__.py +++ b/octoprint_tuyasmartplug/__init__.py @@ -12,6 +12,7 @@ import time import tinytuya import tinytuya.scanner +import tinytuya.wizard from io import StringIO import sys @@ -90,12 +91,73 @@ def get_settings_defaults(self): "useCountdownRules": False, "countdownOnDelay": 0, "countdownOffDelay": 0, + } ], pollingInterval=15, pollingEnabled=False, + apiKey = "", + apiSecret = "", + apiRegion ="us", + + ) + def set_settings_from_tinytuya_apiscan(self,devices): + arrSmartplugs = [] + + for device in devices: + print(device['name']) + print(device['id']) + print(device['ip']) + print(device['key']) + print(device['ver']) + arrSmartplug = { + "ip": device['ip'], + "id": device['id'], + "slot": 1, + "localKey": device['key'], + "label": device['name'], + "icon": "icon-bolt", + "displayWarning": True, + "warnPrinting": False, + "gcodeEnabled": False, + "plugversion": device['ver'], + "gcodeOnDelay": 0, + "gcodeOffDelay": 0, + "autoConnect": True, + "autoConnectDelay": 10.0, + "autoDisconnect": True, + "autoDisconnectDelay": 0, + "sysCmdOn": False, + "sysRunCmdOn": "", + "sysCmdOnDelay": 0, + "sysCmdOff": False, + "sysRunCmdOff": "", + "sysCmdOffDelay": 0, + "currentState": "unknown", + "btnColor": "#808080", + "useCountdownRules": False, + "countdownOnDelay": 0, + "countdownOffDelay": 0, + + } + + arrSmartplugs.append(arrSmartplug) + #print(arrSmartplugs) + settingsvalues = dict( + debug_logging= self._settings.get_boolean(["debug_logging"]), + arrSmartplugs = arrSmartplugs, + pollingInterval = self._settings.get(['pollingInterval']), + pollingEnabled = self._settings.get_boolean(['pollingEnabled']), + apiKey = self._settings.get(['apiKey']), + apiSecret = self._settings.get(['apiSecret']), + apiRegion = self._settings.get(['apiRegion']), ) + arrSmartplugs = dict(arrSmartplugs = arrSmartplugs) + self.on_settings_save(arrSmartplugs) + + + # Retablir plus tard def get_settings_restricted_paths(self): return dict( admin=[ @@ -106,10 +168,12 @@ def get_settings_restricted_paths(self): ) def on_settings_save(self, data): + print(data) old_debug_logging = self._settings.get_boolean(["debug_logging"]) - octoprint.plugin.SettingsPlugin.on_settings_save(self, data) - + print("Before saving",self._settings.get_all_data()) + saveresult = octoprint.plugin.SettingsPlugin.on_settings_save(self, data) + print("\nSetting saved result",saveresult) new_debug_logging = self._settings.get_boolean(["debug_logging"]) if old_debug_logging != new_debug_logging: if new_debug_logging: @@ -144,6 +208,7 @@ def get_template_configs(self): # ~~ SimpleApiPlugin mixin def turn_on(self, pluglabel): + self._tuyasmartplug_logger.debug("Turning on %s." % pluglabel) if self.is_turned_on(pluglabel=pluglabel): self._tuyasmartplug_logger.debug("Plug %s already turned on" % pluglabel) @@ -213,6 +278,8 @@ def turn_off(self, pluglabel): ) def check_status(self, pluglabel, resp=None): + print(self._settings.get(["arrSmartplugs"])) + self._tuyasmartplug_logger.debug("Checking status of %s." % pluglabel) if pluglabel != "": response = resp or self.sendCommand("info", pluglabel) @@ -291,21 +358,6 @@ def sendCommand(self, cmd, pluglabel, args=None): ret = device.status() self._tuyasmartplug_logger.debug("Status: %s" % str(ret)) return ret - # command, arg = commands[cmd] - # func = getattr(device, command, None) - # if not func: - # self._tuyasmartplug_logger.debug("No such command '%s'" % command) - # return False - # if args: - # func(args) - # elif arg is not None: - # func(arg, plug["slot"]) - # else: - # func() - # time.sleep(0.5) - # ret = device.status() - # self._tuyasmartplug_logger.debug("Status: %s" % str(ret)) - # return ret # ~~ Gcode processing hook @@ -320,15 +372,47 @@ def gcode_turn_off(self, plug): def processGCODE(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwargs): if gcode: self._tuyasmartplug_logger.debug(str(cmd)) - if cmd.startswith("M123"): - tmp = sys.stdout - my_result = StringIO() - sys.stdout = my_result + if cmd.startswith("M1234"): + configurationfilepath = os.getcwd() + "/tinytuya.json" + print("configurationfilepath",configurationfilepath) + configurationjsondict = { + "apiKey" : self._settings.get(["apiKey"]), + "apiSecret" : self._settings.get(["apiSecret"]), + "apiRegion" : self._settings.get(["apiRegion"]), + "apiDeviceID": "scan" + } + print(configurationjsondict) + # Convert and write JSON object to file + with open(configurationfilepath, "w") as outfile: + json.dump(configurationjsondict, outfile) + + tinytuya.wizard.wizard(quicklist=True) + + configurationjson = open(configurationfilepath, "r").read() + print(configurationjson) + snapshotpath = os.getcwd() + "/snapshot.json" + if os.path.exists(snapshotpath): + os.remove(snapshotpath) tinytuya.scanner.scan() - sys.stdout = tmp - print(my_result.getvalue()) - self._tuyasmartplug_logger.debug(my_result.getvalue()) - return my_result.getvalue() + snapshotjson = open(snapshotpath, "r").read() + self._tuyasmartplug_logger.debug(snapshotjson) + scanresults = json.loads(snapshotjson) + self.set_settings_from_tinytuya_apiscan(scanresults['devices']) + if os.path.exists(snapshotpath): + os.remove(snapshotpath) + if os.path.exists(configurationfilepath): + os.remove(configurationfilepath) + formatedscanresults = "" + deviceindex=0 + for device in scanresults['devices']: + formatedscanresults += "Device #" + str(deviceindex) + "\n" + formatedscanresults += "Name : " + device['name'] + "\n" + formatedscanresults += "Id : " + device['id'] + "\n" + formatedscanresults += "Ip address : " + device['ip'] + "\n" + formatedscanresults += "Local Key : " + device['key'] + "\n" + formatedscanresults += "Plug Version : " + device['ver'] + "\n" + deviceindex+=1 + return formatedscanresults if cmd.startswith("M80"): name = re.sub(r"^M80\s?", "", cmd) diff --git a/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 b/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 index 7bbddee..d5c4e97 100644 --- a/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 +++ b/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 @@ -48,6 +48,30 @@ +
+
+ + +
+
+
+
+ + +
+
+
+
+ + +
+