diff --git a/README.md b/README.md index 675dfe0a..81c6de5b 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,13 @@ broadlink.setup('myssid', 'mynetworkpass', 3) Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) +#### Advanced options + +You may need to specify a broadcast address if setup is not working. +```python3 +broadlink.setup('myssid', 'mynetworkpass', 3, ip_address='192.168.0.255') +``` + ### Discovery Use this function to discover devices: @@ -61,17 +68,19 @@ devices = broadlink.discover() #### Advanced options You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices. +Using the IP address of your local machine: ```python3 -devices = broadlink.discover(local_ip_address='192.168.0.100') # IP address of your local machine. +devices = broadlink.discover(local_ip_address='192.168.0.100') ``` +Using the broadcast address of your subnet: ```python3 -devices = broadlink.discover(discover_ip_address='192.168.0.255') # Broadcast address of your subnet. +devices = broadlink.discover(discover_ip_address='192.168.0.255') ``` If the device is locked, it may not be discoverable with broadcast. In such cases, you can use the unicast version `broadlink.hello()` for direct discovery: ```python3 -device = broadlink.hello('192.168.0.16') # IP address of your Broadlink device. +device = broadlink.hello('192.168.0.16') ``` If you are a perfomance freak, use `broadlink.xdiscover()` to create devices instantly: @@ -106,23 +115,33 @@ packet = device.check_data() ### Learning RF codes -Learning IR codes takes place in five steps. +Learning RF codes takes place in six steps. 1. Sweep the frequency: ```python3 device.sweep_frequency() ``` 2. When the LED blinks, point the remote at the Broadlink device for the first time and long press the button you want to learn. -3. Enter learning mode: +3. Check if the frequency was successfully identified: +```python3 +ok = device.check_frequency() +if ok: + print('Frequency found!') +``` +4. Enter learning mode: ```python3 device.find_rf_packet() ``` -4. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. -5. Get the RF packet: +5. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. +6. Get the RF packet: ```python3 packet = device.check_data() ``` +#### Notes + +Universal remotes with product id 0x2712 use the same method for learning IR and RF codes. They don't need to sweep frequency. Just call `device.enter_learning()` and `device.check_data()`. + ### Canceling learning You can exit the learning mode in the middle of the process by calling this method: diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 1cf7deb1..d3135501 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -1,19 +1,19 @@ #!/usr/bin/env python3 """The python-broadlink library.""" import socket -import typing as t +from typing import Generator, List, Optional, Tuple, Union from . import exceptions as e from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT from .alarm import S1C -from .climate import hysen -from .cover import dooya +from .climate import hvac, hysen +from .cover import dooya, dooya2, wser from .device import Device, ping, scan from .hub import s3 from .light import lb1, lb2 from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro -from .sensor import a1 -from .switch import bg1, mp1, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b +from .sensor import a1, a2 +from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b SUPPORTED_TYPES = { sp1: { @@ -148,14 +148,19 @@ 0x653C: ("RM4 pro", "Broadlink"), }, a1: { - 0x2714: ("e-Sensor", "Broadlink"), + 0x2714: ("A1", "Broadlink"), + }, + a2: { + 0x4F60: ("A2", "Broadlink"), }, mp1: { 0x4EB5: ("MP1-1K4S", "Broadlink"), - 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), 0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"), 0x4F65: ("MP1-1K3S2U", "Broadlink"), }, + mp1s: { + 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), + }, lb1: { 0x5043: ("SB800TD", "Broadlink (OEM)"), 0x504E: ("LB1", "Broadlink"), @@ -177,26 +182,38 @@ S1C: { 0x2722: ("S2KIT", "Broadlink"), }, - s3: { + s3: { 0xA59C: ("S3", "Broadlink"), 0xA64D: ("S3", "Broadlink"), }, + hvac: { + 0x4E2A: ("HVAC", "Licensed manufacturer"), + }, hysen: { 0x4EAD: ("HY02/HY03", "Hysen"), }, dooya: { 0x4E4D: ("DT360E-45/20", "Dooya"), }, + dooya2: { + 0x4F6E: ("DT360E-45/20", "Dooya"), + }, + wser: { + 0x4F6C: ("WSER", "Wistar"), + }, bg1: { 0x51E3: ("BG800/BG900", "BG Electrical"), }, + ehc31: { + 0x6480: ("EHC31", "BG Electrical"), + }, } def gendevice( dev_type: int, - host: t.Tuple[str, int], - mac: t.Union[bytes, str], + host: Tuple[str, int], + mac: Union[bytes, str], name: str = "", is_locked: bool = False, ) -> Device: @@ -222,7 +239,7 @@ def gendevice( def hello( - host: str, + ip_address: str, port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT, ) -> Device: @@ -232,7 +249,11 @@ def hello( """ try: return next( - xdiscover(timeout=timeout, discover_ip_address=host, discover_ip_port=port) + xdiscover( + timeout=timeout, + discover_ip_address=ip_address, + discover_ip_port=port, + ) ) except StopIteration as err: raise e.NetworkTimeoutError( @@ -244,33 +265,42 @@ def hello( def discover( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.List[Device]: +) -> List[Device]: """Discover devices connected to the local network.""" - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) return [gendevice(*resp) for resp in responses] def xdiscover( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.Generator[Device, None, None]: +) -> Generator[Device, None, None]: """Discover devices connected to the local network. This function returns a generator that yields devices instantly. """ - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) for resp in responses: yield gendevice(*resp) # Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. # Only tested with Broadlink RM3 Mini (Blackbean) -def setup(ssid: str, password: str, security_mode: int) -> None: +def setup( + ssid: str, + password: str, + security_mode: int, + ip_address: str = DEFAULT_BCAST_ADDR, +) -> None: """Set up a new Broadlink device via AP mode.""" # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) payload = bytearray(0x88) @@ -299,5 +329,5 @@ def setup(ssid: str, password: str, security_mode: int) -> None: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.sendto(payload, (DEFAULT_BCAST_ADDR, DEFAULT_PORT)) + sock.sendto(payload, (ip_address, DEFAULT_PORT)) sock.close() diff --git a/broadlink/climate.py b/broadlink/climate.py old mode 100644 new mode 100755 index eee5f119..1a0c6006 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -1,5 +1,7 @@ -"""Support for HVAC units.""" -import typing as t +"""Support for climate control.""" +import enum +import struct +from typing import List, Sequence from . import exceptions as e from .device import Device @@ -19,7 +21,7 @@ class hysen(Device): TYPE = "HYS" - def send_request(self, request: t.Sequence[int]) -> bytes: + def send_request(self, request: Sequence[int]) -> bytes: """Send a request to the device.""" packet = bytearray() packet.extend((len(request) + 2).to_bytes(2, "little")) @@ -31,27 +33,34 @@ def send_request(self, request: t.Sequence[int]) -> bytes: payload = self.decrypt(response[0x38:]) p_len = int.from_bytes(payload[:0x02], "little") - if p_len + 2 > len(payload): - raise ValueError( - "hysen_response_error", "first byte of response is not length" - ) - - nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") real_crc = CRC16.calculate(payload[0x02:p_len]) + if nom_crc != real_crc: - raise ValueError("hysen_response_error", "CRC check on response failed") + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) return payload[0x02:p_len] + def _decode_temp(self, payload, base_index): + base_temp = payload[base_index] / 2.0 + add_offset = (payload[4] >> 3) & 1 # should offset be added? + offset_raw_value = (payload[17] >> 4) & 3 # offset value + offset = (offset_raw_value + 1) / 10 if add_offset else 0.0 + return base_temp + offset + def get_temp(self) -> float: """Return the room temperature in degrees celsius.""" payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) - return payload[0x05] / 2.0 + return self._decode_temp(payload, 5) def get_external_temp(self) -> float: """Return the external temperature in degrees celsius.""" payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) - return payload[18] / 2.0 + return self._decode_temp(payload, 18) def get_full_status(self) -> dict: """Return the state of the device. @@ -64,9 +73,10 @@ def get_full_status(self) -> dict: data["power"] = payload[4] & 1 data["active"] = (payload[4] >> 4) & 1 data["temp_manual"] = (payload[4] >> 6) & 1 - data["room_temp"] = payload[5] / 2.0 + data["heating_cooling"] = (payload[4] >> 7) & 1 + data["room_temp"] = self._decode_temp(payload, 5) data["thermostat_temp"] = payload[6] / 2.0 - data["auto_mode"] = payload[7] & 0xF + data["auto_mode"] = payload[7] & 0x0F data["loop_mode"] = payload[7] >> 4 data["sensor"] = payload[8] data["osv"] = payload[9] @@ -79,7 +89,7 @@ def get_full_status(self) -> dict: data["fre"] = payload[15] data["poweron"] = payload[16] data["unknown"] = payload[17] - data["external_temp"] = payload[18] / 2.0 + data["external_temp"] = self._decode_temp(payload, 18) data["hour"] = payload[19] data["min"] = payload[20] data["sec"] = payload[21] @@ -117,7 +127,9 @@ def get_full_status(self) -> dict: # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule) # loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule) # The sensor command is currently experimental - def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: + def set_mode( + self, auto_mode: int, loop_mode: int, sensor: int = 0 + ) -> None: """Set the mode of the device.""" mode_byte = ((loop_mode + 1) << 4) + auto_mode self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]) @@ -185,16 +197,32 @@ def set_temp(self, temp: float) -> None: # Set device on(1) or off(0), does not deactivate Wifi connectivity. # Remote lock disables control by buttons on thermostat. - def set_power(self, power: int = 1, remote_lock: int = 0) -> None: + # heating_cooling: heating(0) cooling(1) + def set_power( + self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0 + ) -> None: """Set the power state of the device.""" - self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, power]) + state = (heating_cooling << 7) + power + self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, state]) # set time on device # n.b. day=1 is Monday, ..., day=7 is Sunday def set_time(self, hour: int, minute: int, second: int, day: int) -> None: """Set the time.""" self.send_request( - [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] + [ + 0x01, + 0x10, + 0x00, + 0x08, + 0x00, + 0x02, + 0x04, + hour, + minute, + second, + day + ] ) # Set timer schedule @@ -203,7 +231,7 @@ def set_time(self, hour: int, minute: int, second: int, day: int) -> None: # {'start_hour':17, 'start_minute':30, 'temp': 22 } # Each one specifies the thermostat temp that will become effective at start_hour:start_minute # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) - def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: + def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: """Set timer schedule.""" request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18] @@ -226,3 +254,221 @@ def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: request.append(int(weekend[i]["temp"] * 2)) self.send_request(request) + + +class hvac(Device): + """Controls a HVAC. + + Supported models: + - Tornado SMART X SQ series + - Aux ASW-H12U3/JIR1DI-US + - Aux ASW-H36U2/LFR1DI-US + """ + + TYPE = "HVAC" + + @enum.unique + class Mode(enum.IntEnum): + """Enumerates modes.""" + + AUTO = 0 + COOL = 1 + DRY = 2 + HEAT = 3 + FAN = 4 + + @enum.unique + class Speed(enum.IntEnum): + """Enumerates fan speed.""" + + HIGH = 1 + MID = 2 + LOW = 3 + AUTO = 5 + + @enum.unique + class Preset(enum.IntEnum): + """Enumerates presets.""" + + NORMAL = 0 + TURBO = 1 + MUTE = 2 + + @enum.unique + class SwHoriz(enum.IntEnum): + """Enumerates horizontal swing.""" + + ON = 0 + OFF = 7 + + @enum.unique + class SwVert(enum.IntEnum): + """Enumerates vertical swing.""" + + ON = 0 + POS1 = 1 + POS2 = 2 + POS3 = 3 + POS4 = 4 + POS5 = 5 + OFF = 7 + + def _encode(self, data: bytes) -> bytes: + """Encode data for transport.""" + packet = bytearray(10) + p_len = 10 + len(data) + struct.pack_into( + " bytes: + """Decode data from transport.""" + # payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00]) + payload = self.decrypt(response[0x38:]) + p_len = int.from_bytes(payload[:0x02], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) + + d_len = int.from_bytes(payload[0x08:0x0A], "little") + return payload[0x0A:0x0A+d_len] + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a command to the unit.""" + prefix = bytes([((command << 4) | 1), 1]) + packet = self._encode(prefix + data) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response)[0x02:] + + def _parse_state(self, data: bytes) -> dict: + """Parse state.""" + state = {} + state["power"] = bool(data[0x08] & 1 << 5) + state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5 + state["swing_v"] = self.SwVert(data[0x00] & 0b111) + state["swing_h"] = self.SwHoriz(data[0x01] >> 5) + state["mode"] = self.Mode(data[0x05] >> 5) + state["speed"] = self.Speed(data[0x03] >> 5) + state["preset"] = self.Preset(data[0x04] >> 6) + state["sleep"] = bool(data[0x05] & 1 << 2) + state["ifeel"] = bool(data[0x05] & 1 << 3) + state["health"] = bool(data[0x08] & 1 << 1) + state["clean"] = bool(data[0x08] & 1 << 2) + state["display"] = bool(data[0x0A] & 1 << 4) + state["mildew"] = bool(data[0x0A] & 1 << 3) + return state + + def set_state( + self, + power: bool, + target_temp: float, # 16<=target_temp<=32 + mode: Mode, + speed: Speed, + preset: Preset, + swing_h: SwHoriz, + swing_v: SwVert, + sleep: bool, + ifeel: bool, + display: bool, + health: bool, + clean: bool, + mildew: bool, + ) -> dict: + """Set the state of the device.""" + # TODO: decode unknown bits + UNK0 = 0b100 + UNK1 = 0b1101 + UNK2 = 0b101 + + target_temp = round(target_temp * 2) / 2 + + if preset == self.Preset.MUTE: + if mode != self.Mode.FAN: + raise ValueError("mute is only available in fan mode") + speed = self.Speed.LOW + + elif preset == self.Preset.TURBO: + if mode not in {self.Mode.COOL, self.Mode.HEAT}: + raise ValueError("turbo is only available in cooling/heating") + speed = self.Speed.HIGH + + data = bytearray(0x0D) + data[0x00] = (int(target_temp) - 8 << 3) | swing_v + data[0x01] = (swing_h << 5) | UNK0 + data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1 + data[0x03] = speed << 5 + data[0x04] = preset << 6 + data[0x05] = mode << 5 | sleep << 2 | ifeel << 3 + data[0x08] = power << 5 | clean << 2 | (health and 0b11) + data[0x0A] = display << 4 | mildew << 3 + data[0x0C] = UNK2 + + resp = self._send(0, data) + return self._parse_state(resp) + + def get_state(self) -> dict: + """Returns a dictionary with the unit's parameters. + + Returns: + dict: + power (bool): + target_temp (float): temperature set point 16 dict: + """Returns dictionary with AC info. + + Returns: + dict: + power (bool): power + ambient_temp (float): ambient temperature + """ + resp = self._send(2) + + if len(resp) < 22: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 24 bytes and received {len(resp) + 2}", + ) + + ac_info = {} + ac_info["power"] = resp[0x1] & 1 + + ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111 + if any(ambient_temp): + ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0 + + return ac_info diff --git a/broadlink/cover.py b/broadlink/cover.py index c0f08abb..75317943 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -1,5 +1,6 @@ """Support for covers.""" import time +from typing import Sequence from . import exceptions as e from .device import Device @@ -8,33 +9,34 @@ class dooya(Device): """Controls a Dooya curtain motor.""" - TYPE = "Dooya DT360E" + TYPE = "DT360E" - def _send(self, magic1: int, magic2: int) -> int: + def _send(self, command: int, attribute: int = 0) -> int: """Send a packet to the device.""" packet = bytearray(16) - packet[0] = 0x09 - packet[2] = 0xBB - packet[3] = magic1 - packet[4] = magic2 - packet[9] = 0xFA - packet[10] = 0x44 - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) + packet[0x00] = 0x09 + packet[0x02] = 0xBB + packet[0x03] = command + packet[0x04] = attribute + packet[0x09] = 0xFA + packet[0x0A] = 0x44 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) return payload[4] def open(self) -> int: """Open the curtain.""" - return self._send(0x01, 0x00) + return self._send(0x01) def close(self) -> int: """Close the curtain.""" - return self._send(0x02, 0x00) + return self._send(0x02) def stop(self) -> int: """Stop the curtain.""" - return self._send(0x03, 0x00) + return self._send(0x03) def get_percentage(self) -> int: """Return the position of the curtain.""" @@ -55,3 +57,126 @@ def set_percentage_and_wait(self, new_percentage: int) -> None: time.sleep(0.2) current = self.get_percentage() self.stop() + + +class dooya2(Device): + """Controls a Dooya curtain motor (version 2).""" + + TYPE = "DT360E-2" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def open(self) -> None: + """Open the curtain.""" + self._send(2, [0x00, 0x01, 0x00]) + + def close(self) -> None: + """Close the curtain.""" + self._send(2, [0x00, 0x02, 0x00]) + + def stop(self) -> None: + """Stop the curtain.""" + self._send(2, [0x00, 0x03, 0x00]) + + def get_percentage(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, [0x00, 0x06, 0x00]) + return resp[0x11] + + def set_percentage(self, new_percentage: int) -> None: + """Set the position of the curtain.""" + self._send(2, [0x00, 0x09, new_percentage]) + + +class wser(Device): + """Controls a Wistar curtain motor""" + + TYPE = "WSER" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def get_position(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, []) + position = resp[0x0E] + return position + + def open(self) -> int: + """Open the curtain.""" + resp = self._send(2, [0x4A, 0x31, 0xA0]) + position = resp[0x0E] + return position + + def close(self) -> int: + """Close the curtain.""" + resp = self._send(2, [0x61, 0x32, 0xA0]) + position = resp[0x0E] + return position + + def stop(self) -> int: + """Stop the curtain.""" + resp = self._send(2, [0x4C, 0x73, 0xA0]) + position = resp[0x0E] + return position + + def set_position(self, position: int) -> int: + """Set the position of the curtain.""" + resp = self._send(2, [position, 0x70, 0xA0]) + position = resp[0x0E] + return position diff --git a/broadlink/device.py b/broadlink/device.py index 74e916f4..5a10bc01 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -3,7 +3,7 @@ import threading import random import time -import typing as t +from typing import Generator, Optional, Tuple, Union from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes @@ -17,15 +17,15 @@ ) from .protocol import Datetime -HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool] +HelloResponse = Tuple[int, Tuple[str, int], str, str, bool] def scan( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.Generator[HelloResponse, None, None]: +) -> Generator[HelloResponse, None, None]: """Broadcast a hello message and yield responses.""" conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -76,7 +76,7 @@ def scan( conn.close() -def ping(address: str, port: int = DEFAULT_PORT) -> None: +def ping(ip_address: str, port: int = DEFAULT_PORT) -> None: """Send a ping packet to an address. This packet feeds the watchdog timer of firmwares >= v53. @@ -87,7 +87,7 @@ def ping(address: str, port: int = DEFAULT_PORT) -> None: conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) packet = bytearray(0x30) packet[0x26] = 1 - conn.sendto(packet, (address, port)) + conn.sendto(packet, (ip_address, port)) class Device: @@ -100,8 +100,8 @@ class Device: def __init__( self, - host: t.Tuple[str, int], - mac: t.Union[bytes, str], + host: Tuple[str, int], + mac: Union[bytes, str], devtype: int, timeout: int = DEFAULT_TIMEOUT, name: str = "", diff --git a/broadlink/helpers.py b/broadlink/helpers.py index 6ee54991..e7b3d4c9 100644 --- a/broadlink/helpers.py +++ b/broadlink/helpers.py @@ -1,5 +1,5 @@ """Helper functions and classes.""" -import typing as t +from typing import Dict, List, Sequence class CRC16: @@ -8,10 +8,10 @@ class CRC16: CRC tables are cached for performance. """ - _cache: t.Dict[int, t.List[int]] = {} + _cache: Dict[int, List[int]] = {} @classmethod - def get_table(cls, polynomial: int) -> t.List[int]: + def get_table(cls, polynomial: int) -> List[int]: """Return the CRC-16 table for a polynomial.""" try: crc_table = cls._cache[polynomial] @@ -31,7 +31,7 @@ def get_table(cls, polynomial: int) -> t.List[int]: @classmethod def calculate( cls, - sequence: t.Sequence[int], + sequence: Sequence[int], polynomial: int = 0xA001, # CRC-16-ANSI. init_value: int = 0xFFFF, ) -> int: diff --git a/broadlink/hub.py b/broadlink/hub.py index 07b02e82..0fd4ae53 100644 --- a/broadlink/hub.py +++ b/broadlink/hub.py @@ -1,6 +1,7 @@ """Support for hubs.""" import struct import json +from typing import Optional from . import exceptions as e from .device import Device @@ -12,25 +13,37 @@ class s3(Device): TYPE = "S3" MAX_SUBDEVICES = 8 - def get_subdevices(self) -> list: - """Return the lit of sub devices.""" + def get_subdevices(self, step: int = 5) -> list: + """Return a list of sub devices.""" + total = self.MAX_SUBDEVICES sub_devices = [] - step = 5 + seen = set() + index = 0 - for index in range(0, self.MAX_SUBDEVICES, step): + while index < total: state = {"count": step, "index": index} packet = self._encode(14, state) resp = self.send_packet(0x6A, packet) e.check_error(resp[0x22:0x24]) resp = self._decode(resp) - sub_devices.extend(resp["list"]) - if len(sub_devices) == resp["total"]: + for device in resp["list"]: + did = device["did"] + if did in seen: + continue + + seen.add(did) + sub_devices.append(device) + + total = resp["total"] + if len(seen) >= total: break + index += step + return sub_devices - def get_state(self, did: str = None) -> dict: + def get_state(self, did: Optional[str] = None) -> dict: """Return the power state of the device.""" state = {} if did is not None: @@ -43,10 +56,10 @@ def get_state(self, did: str = None) -> dict: def set_state( self, - did: str = None, - pwr1: bool = None, - pwr2: bool = None, - pwr3: bool = None, + did: Optional[str] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + pwr3: Optional[bool] = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -69,7 +82,9 @@ def _encode(self, flag: int, state: dict) -> bytes: # flag: 1 for reading, 2 for writing. packet = bytearray(12) data = json.dumps(state, separators=(",", ":")).encode() - struct.pack_into(" dict: """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: def set_state( self, - pwr: bool = None, - red: int = None, - blue: int = None, - green: int = None, - brightness: int = None, - colortemp: int = None, - hue: int = None, - saturation: int = None, - transitionduration: int = None, - maxworktime: int = None, - bulb_colormode: int = None, - bulb_scenes: str = None, - bulb_scene: str = None, - bulb_sceneidx: int = None, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, + bulb_sceneidx: Optional[int] = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -101,7 +102,7 @@ def _decode(self, response: bytes) -> dict: """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: def set_state( self, - pwr: bool = None, - red: int = None, - blue: int = None, - green: int = None, - brightness: int = None, - colortemp: int = None, - hue: int = None, - saturation: int = None, - transitionduration: int = None, - maxworktime: int = None, - bulb_colormode: int = None, - bulb_scenes: str = None, - bulb_scene: str = None, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -183,7 +184,9 @@ def _encode(self, flag: int, state: dict) -> bytes: # flag: 1 for reading, 2 for writing. packet = bytearray(12) data = json.dumps(state, separators=(",", ":")).encode() - struct.pack_into(" dict: """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" bytes: + """Convert a microsecond duration sequence into a Broadlink IR packet.""" + result = bytearray(4) + result[0x00] = 0x26 + + for pulse in pulses: + div, mod = divmod(int(pulse // tick), 256) + if div: + result.append(0) + result.append(div) + result.append(mod) + + data_len = len(result) - 4 + result[0x02] = data_len & 0xFF + result[0x03] = data_len >> 8 + + return result + + +def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]: + """Parse a Broadlink packet into a microsecond duration sequence.""" + result = [] + index = 4 + end = min(256 * data[0x03] + data[0x02] + 4, len(data)) + + while index < end: + chunk = data[index] + index += 1 + + if chunk == 0: + try: + chunk = 256 * data[index] + data[index + 1] + except IndexError as err: + raise ValueError("Malformed data.") from err + index += 2 + + result.append(int(chunk * tick)) + + return result + + class rmmini(Device): """Controls a Broadlink RM mini 3.""" @@ -46,14 +88,19 @@ def sweep_frequency(self) -> None: """Sweep frequency.""" self._send(0x19) - def check_frequency(self) -> bool: + def check_frequency(self) -> Tuple[bool, float]: """Return True if the frequency was identified successfully.""" resp = self._send(0x1A) - return resp[0] == 1 + is_found = bool(resp[0]) + frequency = struct.unpack(" None: + def find_rf_packet(self, frequency: Optional[float] = None) -> None: """Enter radiofrequency learning mode.""" - self._send(0x1B) + payload = bytearray() + if frequency: + payload += struct.pack(" None: """Cancel sweep frequency.""" @@ -82,7 +129,7 @@ def _send(self, command: int, data: bytes = b"") -> bytes: e.check_error(resp[0x22:0x24]) payload = self.decrypt(resp[0x38:]) p_len = struct.unpack(" dict: def check_sensors_raw(self) -> dict: """Return the state of the sensors in raw format.""" packet = bytearray([0x1]) - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - data = payload[0x4:] + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + data = self.decrypt(resp[0x38:]) - temperature = struct.unpack("> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + data = self._send(1) return { - "temperature": temperature, - "humidity": humidity, - "light": data[0x4], - "air_quality": data[0x6], - "noise": data[0x8], + "temperature": data[0x13] * 256 + data[0x14], + "humidity": data[0x15] * 256 + data[0x16], + "pm10": data[0x0D] * 256 + data[0x0E], + "pm2_5": data[0x0F] * 256 + data[0x10], + "pm1": data[0x11] * 256 + data[0x12], } diff --git a/broadlink/switch.py b/broadlink/switch.py index 1079cde0..8393f6b1 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -1,6 +1,7 @@ """Support for switches.""" import json import struct +from typing import Optional from . import exceptions as e from .device import Device @@ -127,12 +128,12 @@ def set_nightlight(self, ntlight: bool) -> None: def set_state( self, - pwr: bool = None, - ntlight: bool = None, - indicator: bool = None, - ntlbrightness: int = None, - maxworktime: int = None, - childlock: bool = None, + pwr: Optional[bool] = None, + ntlight: Optional[bool] = None, + indicator: Optional[bool] = None, + ntlbrightness: Optional[int] = None, + maxworktime: Optional[int] = None, + childlock: Optional[bool] = None, ) -> dict: """Set state of device.""" state = {} @@ -186,7 +187,7 @@ def _decode(self, response: bytes) -> dict: e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: def set_state( self, - pwr: bool = None, - pwr1: bool = None, - pwr2: bool = None, - maxworktime: int = None, - maxworktime1: int = None, - maxworktime2: int = None, - idcbrightness: int = None, + pwr: Optional[bool] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + maxworktime: Optional[int] = None, + maxworktime1: Optional[int] = None, + maxworktime2: Optional[int] = None, + idcbrightness: Optional[int] = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -291,7 +292,16 @@ def _encode(self, flag: int, state: dict) -> bytes: data = json.dumps(state).encode() length = 12 + len(data) struct.pack_into( - " dict: """Decode a message.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if pwr3 is not None: + state["pwr3"] = int(bool(pwr3)) + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if maxworktime3 is not None: + state["maxworktime3"] = maxworktime3 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + if childlock is not None: + state["childlock"] = int(bool(childlock)) + if childlock1 is not None: + state["childlock1"] = int(bool(childlock1)) + if childlock2 is not None: + state["childlock2"] = int(bool(childlock2)) + if childlock3 is not None: + state["childlock3"] = int(bool(childlock3)) + if childlock4 is not None: + state["childlock4"] = int(bool(childlock4)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + class mp1(Device): """Controls a Broadlink MP1.""" @@ -360,3 +426,47 @@ def check_power(self) -> dict: "s3": bool(data & 4), "s4": bool(data & 8), } + + +class mp1s(mp1): + """Controls a Broadlink MP1S.""" + + TYPE = "MP1S" + + def get_state(self) -> dict: + """Return the power state of the device. + + voltage in V. + current in A. + power in W. + power consumption in kW·h. + """ + packet = bytearray(16) + packet[0x00] = 0x0E + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + packet[0x0A] = 0x04 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + payload_str = payload.hex()[4:-6] + + def get_value(start, end, factors): + value = sum( + int(payload_str[i-2:i]) * factor + for i, factor in zip(range(start, end, -2), factors) + ) + return value + + return { + "volt": get_value(34, 30, [10, 0.1]), + "current": get_value(40, 34, [1, 0.01, 0.0001]), + "power": get_value(46, 40, [100, 1, 0.01]), + "totalconsum": get_value(54, 46, [10000, 100, 1, 0.01]), + } diff --git a/cli/README.md b/cli/README.md index 7b40b4cf..b7e48dc9 100644 --- a/cli/README.md +++ b/cli/README.md @@ -97,7 +97,7 @@ broadlink_cli --device @BEDROOM.device --temperature #### Check humidity ``` -broadlink_cli --device @BEDROOM.device --temperature +broadlink_cli --device @BEDROOM.device --humidity ``` ### Smart plugs diff --git a/cli/broadlink_cli b/cli/broadlink_cli index f7a24ade..7913e332 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -1,68 +1,32 @@ #!/usr/bin/env python3 import argparse import base64 -import codecs import time +from typing import List import broadlink from broadlink.const import DEFAULT_PORT from broadlink.exceptions import ReadError, StorageError +from broadlink.remote import data_to_pulses, pulses_to_data -TICK = 32.84 TIMEOUT = 30 -IR_TOKEN = 0x26 def auto_int(x): return int(x, 0) -def to_microseconds(bytes): - result = [] - # print bytes[0] # 0x26 = 38for IR - index = 4 - while index < len(bytes): - chunk = bytes[index] - index += 1 - if chunk == 0: - chunk = bytes[index] - chunk = 256 * chunk + bytes[index + 1] - index += 2 - result.append(int(round(chunk * TICK))) - if chunk == 0x0d05: - break - return result - - -def durations_to_broadlink(durations): - result = bytearray() - result.append(IR_TOKEN) - result.append(0) - result.append(len(durations) % 256) - result.append(len(durations) / 256) - for dur in durations: - num = int(round(dur / TICK)) - if num > 255: - result.append(0) - result.append(num / 256) - result.append(num % 256) - return result - - -def format_durations(data): - result = '' - for i in range(0, len(data)): - if len(result) > 0: - result += ' ' - result += ('+' if i % 2 == 0 else '-') + str(data[i]) - return result +def format_pulses(pulses: List[int]) -> str: + """Format pulses.""" + return " ".join( + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" + for i, pulse in enumerate(pulses) + ) -def parse_durations(str): - result = [] - for s in str.split(): - result.append(abs(int(s))) - return result +def parse_pulses(data: List[str]) -> List[int]: + """Parse pulses.""" + return [abs(int(s)) for s in data] parser = argparse.ArgumentParser(fromfile_prefix_chars='@') @@ -83,7 +47,8 @@ parser.add_argument("--switch", action="store_true", help="switch state from on parser.add_argument("--send", action="store_true", help="send command") parser.add_argument("--sensors", action="store_true", help="check all sensors") parser.add_argument("--learn", action="store_true", help="learn command") -parser.add_argument("--rfscanlearn", action="store_true", help="rf scan learning") +parser.add_argument("--rflearn", action="store_true", help="rf scan learning") +parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning") parser.add_argument("--learnfile", help="save learned command to a specified file") parser.add_argument("--durations", action="store_true", help="use durations in micro seconds instead of the Broadlink format") @@ -111,8 +76,8 @@ if args.joinwifi: if args.convert: data = bytearray.fromhex(''.join(args.data)) - durations = to_microseconds(data) - print(format_durations(durations)) + pulses = data_to_pulses(data) + print(format_pulses(pulses)) if args.temperature: print(dev.check_temperature()) if args.humidity: @@ -124,10 +89,13 @@ if args.sensors: for key in data: print("{} {}".format(key, data[key])) if args.send: - data = durations_to_broadlink(parse_durations(' '.join(args.data))) \ - if args.durations else bytearray.fromhex(''.join(args.data)) + data = ( + pulses_to_data(parse_pulses(args.data)) + if args.durations + else bytes.fromhex(''.join(args.data)) + ) dev.send_data(data) -if args.learn or (args.learnfile and not args.rfscanlearn): +if args.learn or (args.learnfile and not args.rflearn): dev.enter_learning() print("Learning...") start = time.time() @@ -143,17 +111,19 @@ if args.learn or (args.learnfile and not args.rfscanlearn): print("No data received...") exit(1) - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learn: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: " + str(base64.b64encode(decode_hex(learned)[0]))) + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt) if args.check: if dev.check_power(): print('* ON *') @@ -195,28 +165,33 @@ if args.switch: else: dev.set_power(True) print('* Switch to ON *') -if args.rfscanlearn: - dev.sweep_frequency() - print("Learning RF Frequency, press and hold the button to learn...") - - start = time.time() - while time.time() - start < TIMEOUT: - time.sleep(1) - if dev.check_frequency(): - break +if args.rflearn: + if args.frequency: + frequency = args.frequency + print("Press the button you want to learn, a short press...") else: - print("RF Frequency not found") - dev.cancel_sweep_frequency() - exit(1) + dev.sweep_frequency() + print("Detecting radiofrequency, press and hold the button to learn...") + + start = time.time() + while time.time() - start < TIMEOUT: + time.sleep(1) + locked, frequency = dev.check_frequency() + if locked: + break + else: + print("Radiofrequency not found") + dev.cancel_sweep_frequency() + exit(1) - print("Found RF Frequency - 1 of 2!") - print("You can now let go of the button") + print("Radiofrequency detected: {}MHz".format(frequency)) + print("You can now let go of the button") - input("Press enter to continue...") + input("Press enter to continue...") - print("To complete learning, single press the button you want to learn") + print("Press the button again, now a short press.") - dev.find_rf_packet() + dev.find_rf_packet(frequency) start = time.time() while time.time() - start < TIMEOUT: @@ -231,15 +206,16 @@ if args.rfscanlearn: print("No data received...") exit(1) - print("Found RF Frequency - 2 of 2!") - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learnfile is None: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0])))) - if args.learnfile is not None: + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt)