diff --git a/README.md b/README.md index a923761..270932c 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,6 @@ +Most of this ReadMe is obsolete. Here is a [video](http://moduloinfo.ca/tuyavideo.mp4) that teach how to use the new version. If the interest is there, I will make a proper readme file or accept help :) + + # OctoPrint-TuyaSmartplug Work based on [OctoPrint-TPLinkSmartplug](https://github.com/jneilliii/OctoPrint-TPLinkSmartplug) and [python-tuya](https://github.com/clach04/python-tuya). diff --git a/octoprint_tuyasmartplug/__init__.py b/octoprint_tuyasmartplug/__init__.py index 659883d..038e7ce 100644 --- a/octoprint_tuyasmartplug/__init__.py +++ b/octoprint_tuyasmartplug/__init__.py @@ -10,8 +10,11 @@ import re import threading import time - -from octoprint_tuyasmartplug.utils import pytuya +import tinytuya +import tinytuya.scanner +import tinytuya.wizard +from io import StringIO +import sys class tuyasmartplugPlugin( @@ -70,7 +73,7 @@ def get_settings_defaults(self): "displayWarning": True, "warnPrinting": False, "gcodeEnabled": False, - "v33": False, + "plugversion": "1.4", "gcodeOnDelay": 0, "gcodeOffDelay": 0, "autoConnect": True, @@ -88,12 +91,73 @@ def get_settings_defaults(self): "useCountdownRules": False, "countdownOnDelay": 0, "countdownOffDelay": 0, + } ], pollingInterval=15, pollingEnabled=False, + apiKey = "", + apiSecret = "", + apiRegion ="us", + + ) + def set_settings_from_tinytuya_apiscan(self,devices): + arrSmartplugs = [] + + for device in devices: + print(device['name']) + print(device['id']) + print(device['ip']) + print(device['key']) + print(device['ver']) + arrSmartplug = { + "ip": device['ip'], + "id": device['id'], + "slot": 1, + "localKey": device['key'], + "label": device['name'], + "icon": "icon-bolt", + "displayWarning": True, + "warnPrinting": False, + "gcodeEnabled": False, + "plugversion": device['ver'], + "gcodeOnDelay": 0, + "gcodeOffDelay": 0, + "autoConnect": True, + "autoConnectDelay": 10.0, + "autoDisconnect": True, + "autoDisconnectDelay": 0, + "sysCmdOn": False, + "sysRunCmdOn": "", + "sysCmdOnDelay": 0, + "sysCmdOff": False, + "sysRunCmdOff": "", + "sysCmdOffDelay": 0, + "currentState": "unknown", + "btnColor": "#808080", + "useCountdownRules": False, + "countdownOnDelay": 0, + "countdownOffDelay": 0, + + } + + arrSmartplugs.append(arrSmartplug) + #print(arrSmartplugs) + settingsvalues = dict( + debug_logging= self._settings.get_boolean(["debug_logging"]), + arrSmartplugs = arrSmartplugs, + pollingInterval = self._settings.get(['pollingInterval']), + pollingEnabled = self._settings.get_boolean(['pollingEnabled']), + apiKey = self._settings.get(['apiKey']), + apiSecret = self._settings.get(['apiSecret']), + apiRegion = self._settings.get(['apiRegion']), ) + arrSmartplugs = dict(arrSmartplugs = arrSmartplugs) + self.on_settings_save(arrSmartplugs) + + + # Retablir plus tard def get_settings_restricted_paths(self): return dict( admin=[ @@ -104,10 +168,12 @@ def get_settings_restricted_paths(self): ) def on_settings_save(self, data): + print(data) old_debug_logging = self._settings.get_boolean(["debug_logging"]) - octoprint.plugin.SettingsPlugin.on_settings_save(self, data) - + print("Before saving",self._settings.get_all_data()) + saveresult = octoprint.plugin.SettingsPlugin.on_settings_save(self, data) + print("\nSetting saved result",saveresult) new_debug_logging = self._settings.get_boolean(["debug_logging"]) if old_debug_logging != new_debug_logging: if new_debug_logging: @@ -116,7 +182,7 @@ def on_settings_save(self, data): self._tuyasmartplug_logger.setLevel(logging.INFO) def get_settings_version(self): - return 3 + return 4 def on_settings_migrate(self, target, current=None): if current is None or current < self.get_settings_version(): @@ -142,6 +208,7 @@ def get_template_configs(self): # ~~ SimpleApiPlugin mixin def turn_on(self, pluglabel): + self._tuyasmartplug_logger.debug("Turning on %s." % pluglabel) if self.is_turned_on(pluglabel=pluglabel): self._tuyasmartplug_logger.debug("Plug %s already turned on" % pluglabel) @@ -153,13 +220,10 @@ def turn_on(self, pluglabel): self._settings.get(["arrSmartplugs"]), "label", pluglabel ) self._tuyasmartplug_logger.debug(plug) - if plug["useCountdownRules"]: - chk = self.sendCommand( - "countdown", plug["label"], int(plug["countdownOnDelay"]) - ) - else: - chk = self.sendCommand("on", plug["label"]) - + if plug["useCountdownRules"] : + time.sleep(int(plug["countdownOnDelay"])) + chk = self.sendCommand("on", plug["label"]) + self._tuyasmartplug_logger.debug("chk:"+str(chk)) if chk is not False: self.check_status(plug["label"], chk) if plug["autoConnect"]: @@ -167,15 +231,17 @@ def turn_on(self, pluglabel): int(plug["autoConnectDelay"]), self._printer.connect ) c.start() - if plug["sysCmdOn"]: - t = threading.Timer( - int(plug["sysCmdOnDelay"]), os.system, args=[plug["sysRunCmdOn"]] + if plug["sysCmdOn"]: + t = threading.Timer( + int(plug["sysCmdOnDelay"]), + os.system, + args=[plug["sysRunCmdOn"]], + ) + t.start() + else: + self._plugin_manager.send_plugin_message( + self._identifier, dict(currentState="unknown", label=pluglabel) ) - t.start() - else: - self._plugin_manager.send_plugin_message( - self._identifier, dict(currentState="unknown", label=pluglabel) - ) def turn_off(self, pluglabel): self._tuyasmartplug_logger.debug("Turning off %s." % pluglabel) @@ -190,21 +256,19 @@ def turn_off(self, pluglabel): ) self._tuyasmartplug_logger.debug(plug) if plug["useCountdownRules"]: - chk = self.sendCommand( - "countdown", plug["label"], int(plug["countdownOffDelay"]) - ) + time.sleep(int(plug["countdownOffDelay"])) if plug["sysCmdOff"]: t = threading.Timer( int(plug["sysCmdOffDelay"]), os.system, args=[plug["sysRunCmdOff"]] ) t.start() - if plug["autoDisconnect"]: - self._printer.disconnect() - time.sleep(int(plug["autoDisconnectDelay"])) + if plug["autoDisconnect"]: + self._printer.disconnect() + time.sleep(int(plug["autoDisconnectDelay"])) - if not plug["useCountdownRules"]: - chk = self.sendCommand("off", plug["label"]) + + chk = self.sendCommand("off", plug["label"]) if chk is not False: self.check_status(plug["label"], chk) @@ -214,10 +278,15 @@ def turn_off(self, pluglabel): ) def check_status(self, pluglabel, resp=None): + print(self._settings.get(["arrSmartplugs"])) + self._tuyasmartplug_logger.debug("Checking status of %s." % pluglabel) if pluglabel != "": response = resp or self.sendCommand("info", pluglabel) - if response is False: + if not isinstance(response, dict) or "Error" in response: + self._tuyasmartplug_logger.warning( + "Unable to check device status: %s" % response + ) self._plugin_manager.send_plugin_message( self._identifier, dict(currentState="unknown", label=pluglabel) ) @@ -264,14 +333,15 @@ def plug_search(self, lst, key, value): if item[key] == value: return item - def sendCommand(self, cmd, pluglabel, args=None, tries=1): + def sendCommand(self, cmd, pluglabel, args=None): self._tuyasmartplug_logger.debug("Sending command: %s to %s" % (cmd, pluglabel)) plug = self.plug_search( self._settings.get(["arrSmartplugs"]), "label", pluglabel ) - device = pytuya.OutletDevice(plug["id"], plug["ip"], plug["localKey"]) - if plug.get("v33"): - device.version = 3.3 + device = tinytuya.OutletDevice(plug["id"], plug["ip"], plug["localKey"]) + #if plug.get("v33"): + self._tuyasmartplug_logger.debug("Plug version "+str(plug["plugversion"])) + device.set_version(float(plug["plugversion"])) commands = { "info": ("status", None), @@ -280,39 +350,14 @@ def sendCommand(self, cmd, pluglabel, args=None, tries=1): "countdown": ("set_timer", None), } - try: - command, arg = commands[cmd] - func = getattr(device, command, None) - if not func: - self._tuyasmartplug_logger.debug("No such command '%s'" % command) - return False - if args: - func(args) - elif arg is not None: - func(arg, plug["slot"]) - else: - func() - time.sleep(0.5) + if cmd == "on": + ret = device.turn_on(int(plug["slot"])) + if cmd == "off": + ret = device.turn_off(int(plug["slot"])) + if cmd == "info": ret = device.status() - self._tuyasmartplug_logger.debug("Status: %s" % str(ret)) - return ret - except socket.error as e: - if e.errno == 104: - if tries <= 3: - self._tuyasmartplug_logger.debug( - "Connection refused... Trying again soon" - ) - time.sleep(1) - return self.sendCommand(cmd, pluglabel, args, tries + 1) - self._tuyasmartplug_logger.debug("Too many failed attempts") - return False - self._tuyasmartplug_logger.debug("Network error") - return False - except: - self._tuyasmartplug_logger.debug( - "Something went wrong while running the command" - ) - return False + self._tuyasmartplug_logger.debug("Status: %s" % str(ret)) + return ret # ~~ Gcode processing hook @@ -326,6 +371,62 @@ def gcode_turn_off(self, plug): def processGCODE(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwargs): if gcode: + # To run octoprint server via ssh : + # ssh pi@192.168.2.X + # sudo systemctl stop octoprint.service + # /home/pi/oprint/bin/octoprint serve --host=127.0.0.1 --port=5000 + self._tuyasmartplug_logger.debug(str(cmd)) + if cmd.startswith("M1234"): + tuyaplugindir = os.path.dirname(os.path.realpath(__file__)) + print("tuyaplugindir",tuyaplugindir) + os.chdir(tuyaplugindir) + print("os.getcwd()",os.getcwd()) + configurationfilepath = os.getcwd() + "/tinytuya.json" + #print("configurationfilepath",configurationfilepath) + self._tuyasmartplug_logger.debug(configurationfilepath) + configurationjsondict = { + "apiKey" : self._settings.get(["apiKey"]), + "apiSecret" : self._settings.get(["apiSecret"]), + "apiRegion" : self._settings.get(["apiRegion"]), + "apiDeviceID": "scan" + } + print(configurationjsondict) + # Convert and write JSON object to file + with open(configurationfilepath, "w") as outfile: + json.dump(configurationjsondict, outfile) + + tinytuya.wizard.wizard(quicklist=True) + + configurationjson = open(configurationfilepath, "r").read() + print(configurationjson) + snapshotpath = os.getcwd() + "/snapshot.json" + if os.path.exists(snapshotpath): + os.remove(snapshotpath) + + tinytuya.scanner.scan(forcescan=True,assume_yes=True) + + snapshotjson = open(snapshotpath, "r").read() + self._tuyasmartplug_logger.debug(snapshotjson) + scanresults = json.loads(snapshotjson) + if scanresults['devices'] == []: + return "Scan failed, make sure to power off any devices that may be connected to your plug for the detection step.\n" + self.set_settings_from_tinytuya_apiscan(scanresults['devices']) + if os.path.exists(snapshotpath): + os.remove(snapshotpath) + if os.path.exists(configurationfilepath): + os.remove(configurationfilepath) + formatedscanresults = "" + deviceindex=0 + for device in scanresults['devices']: + formatedscanresults += "Device #" + str(deviceindex) + "\n" + formatedscanresults += "Name : " + device['name'] + "\n" + formatedscanresults += "Id : " + device['id'] + "\n" + formatedscanresults += "Ip address : " + device['ip'] + "\n" + formatedscanresults += "Local Key : " + device['key'] + "\n" + formatedscanresults += "Plug Version : " + device['ver'] + "\n" + deviceindex+=1 + return formatedscanresults + if cmd.startswith("M80"): name = re.sub(r"^M80\s?", "", cmd) self._tuyasmartplug_logger.debug( @@ -338,13 +439,15 @@ def processGCODE(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwar plug = self.plug_search( self._settings.get(["arrSmartplugs"]), "label", name ) - self._tuyasmartplug_logger.debug(plug) - if plug["gcodeEnabled"]: - t = threading.Timer( - int(plug["gcodeOnDelay"]), self.turn_on, args=[plug["label"]] - ) - t.start() - return + self._tuyasmartplug_logger.debug(plug) + if plug["gcodeEnabled"]: + t = threading.Timer( + int(plug["gcodeOnDelay"]), + self.turn_on, + args=[plug["label"]], + ) + t.start() + return elif cmd.startswith("M81"): name = re.sub(r"^M81\s?", "", cmd) self._tuyasmartplug_logger.debug( @@ -357,50 +460,68 @@ def processGCODE(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwar plug = self.plug_search( self._settings.get(["arrSmartplugs"]), "label", name ) - self._tuyasmartplug_logger.debug(plug) - if plug["gcodeEnabled"]: - t = threading.Timer( - int(plug["gcodeOffDelay"]), self.gcode_turn_off, args=[plug] - ) - t.start() - return - else: - return - - elif cmd.startswith("@TUYAON"): - name = re.sub(r"^@TUYAON\s?", "", cmd) - self._tuyasmartplug_logger.debug( - "Received @TUYAON command, attempting power on of %s." % name - ) - plug = self.plug_search(self._settings.get(["arrSmartplugs"]), "ip", name) - if not plug: - plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), "label", name - ) - self._tuyasmartplug_logger.debug(plug) - if plug["gcodeEnabled"]: - t = threading.Timer( - int(plug["gcodeOnDelay"]), self.turn_on, args=[plug["label"]] + self._tuyasmartplug_logger.debug(plug) + if plug["gcodeEnabled"]: + #self.gcode_turn_off(plug) + t = threading.Timer( + int(plug["gcodeOffDelay"]), + self.gcode_turn_off, + args=[plug], + ) + t.start() + return + else: + return + + elif cmd.startswith("@TUYAON"): + name = re.sub(r"^@TUYAON\s?", "", cmd) + self._tuyasmartplug_logger.debug( + "Received @TUYAON command, attempting power on of %s." + % name ) - t.start() - return None - elif cmd.startswith("@TUYAOFF"): - name = re.sub(r"^@TUYAOFF\s?", "", cmd) - self._tuyasmartplug_logger.debug( - "Received TUYAOFF command, attempting power off of %s." % name - ) - plug = self.plug_search(self._settings.get(["arrSmartplugs"]), "ip", name) - if not plug: plug = self.plug_search( - self._settings.get(["arrSmartplugs"]), "label", name - ) - self._tuyasmartplug_logger.debug(plug) - if plug["gcodeEnabled"]: - t = threading.Timer( - int(plug["gcodeOffDelay"]), self.gcode_turn_off, args=[plug] + self._settings.get(["arrSmartplugs"]), "ip", name ) - t.start() - return None + if not plug: + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), "label", name + ) + self._tuyasmartplug_logger.debug(plug) + if plug["gcodeEnabled"]: + t = threading.Timer( + int(plug["gcodeOnDelay"]), + self.turn_on, + args=[plug["label"]], + ) + t.start() + return None + elif cmd.startswith("@TUYAOFF"): + name = re.sub(r"^@TUYAOFF\s?", "", cmd) + self._tuyasmartplug_logger.debug( + "Received TUYAOFF command, attempting power off of %s." + % name + ) + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), + "ip", + name, + ) + if not plug: + plug = self.plug_search( + self._settings.get(["arrSmartplugs"]), + "label", + name, + ) + self._tuyasmartplug_logger.debug(plug) + if plug["gcodeEnabled"]: + #self.gcode_turn_off(plug) + t = threading.Timer( + int(plug["gcodeOffDelay"]), + self.gcode_turn_off, + args=[plug], + ) + t.start() + return None # ~~ Softwareupdate hook diff --git a/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 b/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 index acbf0d5..57e7575 100644 --- a/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 +++ b/octoprint_tuyasmartplug/templates/tuyasmartplug_settings.jinja2 @@ -11,7 +11,6 @@ - @@ -49,6 +48,34 @@ +

After inserting API key, secret and region, use M1234 in gcode console. It may take a while so be patient.

+

Plug info will appear in console if successful and be inserted in tuya plugin settings.

+

This will erase previous plugs !

+

Do not use while printing !!!

+
+
+ + +
+
+
+
+ + +
+
+
+
+ + +
+